summaryrefslogtreecommitdiff
path: root/java/testkit/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-11-08 21:35:19 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-11-08 21:35:19 +0000
commitcc3782c7e9544fc46c846bb82c040b1408bec84f (patch)
treed9ded5420d4cc1d445624866f09fa28728648c8f /java/testkit/src
parent5524c803403513ab52f3833834c9c23c0129efc6 (diff)
downloadqpid-python-cc3782c7e9544fc46c846bb82c040b1408bec84f.tar.gz
This is related to rev 1032640
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1032733 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/testkit/src')
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/Client.java154
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java27
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java197
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java368
5 files changed, 0 insertions, 962 deletions
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
deleted file mode 100644
index b10129d855..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public abstract class Client implements ExceptionListener
-{
- private Connection con;
- private Session ssn;
- private boolean durable = false;
- private boolean transacted = false;
- private int txSize = 10;
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
- private String contentType = "application/octet-stream";
-
- private long reportFrequency = 60000; // every min
-
- private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- private NumberFormat nf = new DecimalFormat("##.00");
-
- private long startTime = System.currentTimeMillis();
- private ErrorHandler errorHandler = null;
-
- public Client(Connection con) throws Exception
- {
- this.con = con;
- this.con.setExceptionListener(this);
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- txSize = Integer.getInteger("tx_size",10);
- contentType = System.getProperty("content_type","application/octet-stream");
- reportFrequency = Long.getLong("report_frequency", 60000);
- }
-
- public void close()
- {
- try
- {
- con.close();
- }
- catch (Exception e)
- {
- handleError("Error closing connection",e);
- }
- }
-
- public void onException(JMSException e)
- {
- handleError("Connection error",e);
- }
-
- public void setErrorHandler(ErrorHandler h)
- {
- this.errorHandler = h;
- }
-
- public void handleError(String msg,Exception e)
- {
- if (errorHandler != null)
- {
- errorHandler.handleError(msg, e);
- }
- else
- {
- System.err.println(msg);
- e.printStackTrace();
- }
- }
-
- protected Session getSsn()
- {
- return ssn;
- }
-
- protected void setSsn(Session ssn)
- {
- this.ssn = ssn;
- }
-
- protected boolean isDurable()
- {
- return durable;
- }
-
- protected boolean isTransacted()
- {
- return transacted;
- }
-
- protected int getTxSize()
- {
- return txSize;
- }
-
- protected int getAck_mode()
- {
- return ack_mode;
- }
-
- protected String getContentType()
- {
- return contentType;
- }
-
- protected long getReportFrequency()
- {
- return reportFrequency;
- }
-
- protected long getStartTime()
- {
- return startTime;
- }
-
- protected void setStartTime(long startTime)
- {
- this.startTime = startTime;
- }
-
- public DateFormat getDf()
- {
- return df;
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
deleted file mode 100644
index dbc73c404f..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.qpid.testkit;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-public interface ErrorHandler {
-
- public void handleError(String msg,Exception e);
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
deleted file mode 100644
index b4294ee4cc..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * A generic receiver which consumes messages
- * from a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It participates in a feedback loop to ensure the producer
- * doesn't fill up the queue. If it receives an "End" msg
- * it sends a reply to the replyTo address in that msg.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * sync_rcv - Whether to consume sync (instead of using a listener).
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs. *
- * check_for_dups - check for duplicate messages and out of order messages.
- * jms_durable_sub - create a durable subscription instead of a regular subscription.
- */
-public class Receiver extends Client implements MessageListener
-{
- long msg_count = 0;
- int sequence = 0;
- boolean syncRcv = Boolean.getBoolean("sync_rcv");
- boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
- boolean checkForDups = Boolean.getBoolean("check_for_dups");
- MessageConsumer consumer;
- List<Integer> duplicateMessages = new ArrayList<Integer>();
-
- public Receiver(Connection con,String addr) throws Exception
- {
- super(con);
- setSsn(con.createSession(isTransacted(), getAck_mode()));
- consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
- if (!syncRcv)
- {
- consumer.setMessageListener(this);
- }
-
- System.out.println("Receiving messages from : " + addr);
- }
-
- public void onMessage(Message msg)
- {
- handleMessage(msg);
- }
-
- public void run() throws Exception
- {
- long sleepTime = getReportFrequency();
- while(true)
- {
- if(syncRcv)
- {
- long t = sleepTime;
- while (t > 0)
- {
- long start = System.currentTimeMillis();
- Message msg = consumer.receive(t);
- t = t - (System.currentTimeMillis() - start);
- handleMessage(msg);
- }
- }
- Thread.sleep(sleepTime);
- System.out.println(getDf().format(System.currentTimeMillis())
- + " - messages received : " + msg_count);
- }
- }
-
- private void handleMessage(Message m)
- {
- if (m == null) { return; }
-
- try
- {
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
- Message controlMsg = getSsn().createTextMessage();
- temp.send(controlMsg);
- if (isTransacted())
- {
- getSsn().commit();
- }
- temp.close();
- }
- else
- {
-
- int seq = m.getIntProperty("sequence");
- if (checkForDups)
- {
- if (seq == 0)
- {
- sequence = 0; // wrap around for each iteration
- System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
- duplicateMessages.clear();
- }
-
- if (seq < sequence)
- {
- duplicateMessages.add(seq);
- }
- else if (seq == sequence)
- {
- sequence++;
- msg_count ++;
- }
- else
- {
- // Multiple publishers are not allowed in this test case.
- // So out of order messages are not allowed.
- throw new Exception(": Received an out of order message (expected="
- + sequence + ",received=" + seq + ")" );
- }
- }
- else
- {
- msg_count ++;
- }
-
- // Please note that this test case doesn't expect duplicates
- // When testing for transactions.
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- handleError("Exception receiving messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Receiver rcv = new Receiver(con,addr);
- rcv.run();
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
deleted file mode 100644
index 14b9b7302f..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.tools.MessageFactory;
-
-/**
- * A generic sender which sends a stream of messages
- * to a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It has a feedback loop to ensure it doesn't fill
- * up queues due to a slow consumer.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * msg_size (256)
- * msg_count (10) - # messages before waiting for feedback
- * sleep_time (1000 ms) - sleep time btw each iteration
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs.
- */
-public class Sender extends Client
-{
- protected int msg_size = 256;
- protected int msg_count = 10;
- protected int iterations = -1;
- protected long sleep_time = 1000;
-
- protected Destination dest = null;
- protected Destination replyTo = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- protected MessageProducer producer;
- Random gen = new Random(19770905);
-
- public Sender(Connection con,String addr) throws Exception
- {
- super(con);
- this.msg_size = Integer.getInteger("msg_size", 100);
- this.msg_count = Integer.getInteger("msg_count", 10);
- this.iterations = Integer.getInteger("iterations", -1);
- this.sleep_time = Long.getLong("sleep_time", 1000);
- this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
- this.dest = new AMQAnyDestination(addr);
- this.producer = getSsn().createProducer(dest);
- this.replyTo = getSsn().createTemporaryQueue();
-
- System.out.println("Sending messages to : " + addr);
- }
-
- /*
- * If msg_size not specified it generates a message
- * between 500-1500 bytes.
- */
- protected Message getNextMessage() throws Exception
- {
- int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
- Message msg = (getContentType().equals("text/plain")) ?
- MessageFactory.createTextMessage(getSsn(), s):
- MessageFactory.createBytesMessage(getSsn(), s);
-
- msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT);
- return msg;
- }
-
- public void run()
- {
- try
- {
- boolean infinite = (iterations == -1);
- for (int x=0; infinite || x < iterations; x++)
- {
- long now = System.currentTimeMillis();
- if (now - getStartTime() >= getReportFrequency())
- {
- System.out.println(df.format(now) + " - iterations : " + x);
- setStartTime(now);
- }
-
- for (int i = 0; i < msg_count; i++)
- {
- Message msg = getNextMessage();
- msg.setIntProperty("sequence",i);
- producer.send(msg);
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- TextMessage m = getSsn().createTextMessage("End");
- m.setJMSReplyTo(replyTo);
- producer.send(m);
-
- if (isTransacted())
- {
- getSsn().commit();
- }
-
- MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
- feedbackConsumer.receive();
- feedbackConsumer.close();
- if (isTransacted())
- {
- getSsn().commit();
- }
- Thread.sleep(sleep_time);
- }
- }
- catch (Exception e)
- {
- handleError("Exception sending messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Sender sender = new Sender(con,addr);
- sender.run();
- }
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
deleted file mode 100644
index 560ada244d..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.thread.Threading;
-
-/**
- * A basic test case class that could launch a Sender/Receiver
- * or both, each on it's own separate thread.
- *
- * If con_count == ssn_count, then each entity created will have
- * it's own Connection. Else if con_count < ssn_count, then
- * a connection will be shared by ssn_count/con_count # of entities.
- *
- * The if both sender and receiver options are set, it will
- * share a connection.
- *
- * The following options are available as jvm args
- * host, port
- * con_count,ssn_count
- * con_idle_time - which determines heartbeat
- * sender, receiver - booleans which indicate which entity to create.
- * Setting them both is also a valid option.
- */
-public class TestLauncher implements ErrorHandler
-{
- protected String host = "127.0.0.1";
- protected int port = 5672;
- protected int sessions_per_con = 1;
- protected int connection_count = 1;
- protected long heartbeat = 5000;
- protected boolean sender = false;
- protected boolean receiver = false;
- protected boolean useUniqueDests = false;
- protected String url;
-
- protected String address = "my_queue; {create: always}";
- protected boolean durable = false;
- protected String failover = "";
- protected AMQConnection controlCon;
- protected Destination controlDest = null;
- protected Session controlSession = null;
- protected MessageProducer statusSender;
- protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
- protected String testName;
-
- public TestLauncher()
- {
- testName = System.getProperty("test_name","UNKNOWN");
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- sessions_per_con = Integer.getInteger("ssn_per_con", 1);
- connection_count = Integer.getInteger("con_count", 1);
- heartbeat = Long.getLong("heartbeat", 5);
- sender = Boolean.getBoolean("sender");
- receiver = Boolean.getBoolean("receiver");
- useUniqueDests = Boolean.getBoolean("use_unique_dests");
-
- failover = System.getProperty("failover", "");
- durable = Boolean.getBoolean("durable");
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
-
- if (failover.equalsIgnoreCase("failover_exchange"))
- {
- url += "&failover='failover_exchange'";
-
- System.out.println("Failover exchange " + url );
- }
-
- configureLogging();
- }
-
- protected void configureLogging()
- {
- PatternLayout layout = new PatternLayout();
- layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
- BasicConfigurator.configure(new ConsoleAppender(layout));
-
- String logLevel = System.getProperty("log.level","warn");
- String logComponent = System.getProperty("log.comp","org.apache.qpid");
-
- Logger logger = Logger.getLogger(logComponent);
- logger.setLevel(Level.toLevel(logLevel, Level.WARN));
-
- System.out.println("Level " + logger.getLevel());
-
- }
-
- public void setUpControlChannel()
- {
- try
- {
- controlCon = new AMQConnection(url);
- controlCon.start();
-
- controlDest = new AMQAnyDestination("control; {create: always}"); // durable
-
- // Create the session to setup the messages
- controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
- statusSender = controlSession.createProducer(controlDest);
-
- }
- catch (Exception e)
- {
- handleError("Error while setting up the test",e);
- }
- }
-
- public void cleanup()
- {
- try
- {
- controlSession.close();
- controlCon.close();
- for (AMQConnection con : clients)
- {
- con.close();
- }
- }
- catch (Exception e)
- {
- handleError("Error while tearing down the test",e);
- }
- }
-
- public void start(String addr)
- {
- try
- {
- if (addr == null)
- {
- addr = address;
- }
-
- int ssn_per_con = sessions_per_con;
- String addrTemp = addr;
- for (int i = 0; i< connection_count; i++)
- {
- AMQConnection con = new AMQConnection(url);
- con.start();
- clients.add(con);
- for (int j = 0; j< ssn_per_con; j++)
- {
- String index = createPrefix(i,j);
- if (useUniqueDests)
- {
- addrTemp = modifySubject(index,addr);
- }
-
- if (sender)
- {
- createSender(index,con,addrTemp,this);
- }
-
- if (receiver)
- {
- createReceiver(index,con,addrTemp,this);
- }
- }
- }
- }
- catch (Exception e)
- {
- handleError("Exception while setting up the test",e);
- }
-
- }
-
- protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Receiver rcv = new Receiver(con,addr);
- rcv.setErrorHandler(h);
- rcv.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Receiver", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Receive thread",e);
- }
-
- t.setName("ReceiverThread-" + index);
- t.start();
- }
-
- protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Sender sender = new Sender(con, addr);
- sender.setErrorHandler(h);
- sender.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Sender", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Sender thread",e);
- }
-
- t.setName("SenderThread-" + index);
- t.start();
- }
-
- public synchronized void handleError(String msg,Exception e)
- {
- // In case sending the message fails
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
-
- try
- {
- TextMessage errorMsg = controlSession.createTextMessage();
- errorMsg.setStringProperty("status", "error");
- errorMsg.setStringProperty("desc", msg);
- errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
- errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
-
- System.out.println("Msg " + errorMsg);
-
- statusSender.send(errorMsg);
- }
- catch (JMSException e1)
- {
- e1.printStackTrace();
- }
- }
-
- private String serializeStackTrace(Exception e)
- {
- ByteArrayOutputStream bOut = new ByteArrayOutputStream();
- PrintStream printStream = new PrintStream(bOut);
- e.printStackTrace(printStream);
- printStream.close();
- return bOut.toString();
- }
-
- private String createPrefix(int i, int j)
- {
- return String.valueOf(i).concat(String.valueOf(j));
- }
-
- /**
- * A basic helper function to modify the subjects by
- * appending an index.
- */
- private String modifySubject(String index,String addr)
- {
- if (addr.indexOf("/") > 0)
- {
- addr = addr.substring(0,addr.indexOf("/")+1) +
- index +
- addr.substring(addr.indexOf("/")+1,addr.length());
- }
- else if (addr.indexOf(";") > 0)
- {
- addr = addr.substring(0,addr.indexOf(";")) +
- "/" + index +
- addr.substring(addr.indexOf(";"),addr.length());
- }
- else
- {
- addr = addr + "/" + index;
- }
-
- return addr;
- }
-
- public static void main(String[] args)
- {
- final TestLauncher test = new TestLauncher();
- test.setUpControlChannel();
- System.out.println("args.length " + args.length);
- System.out.println("args [0] " + args [0]);
- test.start(args.length > 0 ? args [0] : null);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() { test.cleanup(); }
- });
-
- }
-}