diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-09-29 01:55:36 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-09-29 01:55:36 +0000 |
commit | a14a3a1c1a73c58f4d4682bdf72be3db40ce1eb4 (patch) | |
tree | 8d620b7fc7a06de806f56847b7b7253155b8d855 | |
parent | 4251b6440296ea270e9ae63f57c2966db0fcf0a0 (diff) | |
download | qpid-python-a14a3a1c1a73c58f4d4682bdf72be3db40ce1eb4.tar.gz |
Modified the Sender and Receiver to work with the new addressing strings.
You could now invoke the sender or receiver by passing an addressing string as a program argument.
The sender and receiver could now be utilised more easily as a building block for scripting test cases.
Modified the TestLauncher to also work with addressing strings.
The Receiver was also modified to work as a durable subscriber if specified via a JVM arg.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1002446 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 103 insertions, 130 deletions
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java index 6d33a5f788..b6b1bd29a0 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -32,10 +32,11 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; /** - * A generic receiver which consumers a stream of messages + * A generic receiver which consumes messages * from a given address in a broker (host/port) * until told to stop by killing it. * @@ -63,52 +64,31 @@ import org.apache.qpid.client.AMQConnection; * report_frequency - how often a timestamp is printed * durable * transacted - * tx_size - size of transaction batch in # msgs. + * tx_size - size of transaction batch in # msgs. * + * check_for_dups - check for duplicate messages and out of order messages. + * jms_durable_sub - create a durable subscription instead of a regular subscription. */ public class Receiver extends Client implements MessageListener { - // Until addressing is properly supported. - protected enum Reliability { - AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE; - - Reliability getReliability(String s) - { - if (s.equalsIgnoreCase("at_most_once")) - { - return AT_MOST_ONCE; - } - else if (s.equalsIgnoreCase("at_least_once")) - { - return AT_LEAST_ONCE; - } - else - { - return EXACTLY_ONCE; - } - } - }; - long msg_count = 0; int sequence = 0; - boolean sync_rcv = Boolean.getBoolean("sync_rcv"); - boolean uniqueDests = Boolean.getBoolean("unique_dests"); - Reliability reliability = Reliability.EXACTLY_ONCE; + boolean syncRcv = Boolean.getBoolean("sync_rcv"); + boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); + boolean checkForDups = Boolean.getBoolean("check_for_dups"); MessageConsumer consumer; List<Integer> duplicateMessages = new ArrayList<Integer>(); - public Receiver(Connection con,Destination dest) throws Exception + public Receiver(Connection con,String addr) throws Exception { super(con); - reliability = reliability.getReliability(System.getProperty("reliability","exactly_once")); setSsn(con.createSession(isTransacted(), getAck_mode())); - consumer = getSsn().createConsumer(dest); - if (!sync_rcv) + consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + if (!syncRcv) { consumer.setMessageListener(this); } - System.out.println("Operating in mode : " + reliability); - System.out.println("Receiving messages from : " + dest); + System.out.println("Receiving messages from : " + addr); } public void onMessage(Message msg) @@ -118,21 +98,30 @@ public class Receiver extends Client implements MessageListener public void run() throws Exception { + long sleepTime = getReportFrequency(); while(true) { - if(sync_rcv) - { - Message msg = consumer.receive(); - handleMessage(msg); + if(syncRcv) + { + long t = sleepTime; + while (t > 0) + { + long start = System.currentTimeMillis(); + Message msg = consumer.receive(t); + t = t - (System.currentTimeMillis() - start); + handleMessage(msg); + } } - Thread.sleep(getReportFrequency()); - System.out.println(getDf().format(System.currentTimeMillis()) - + " - messages received : " + msg_count); + Thread.sleep(sleepTime); + System.out.println(getDf().format(System.currentTimeMillis()) + + " - messages received : " + msg_count); } } private void handleMessage(Message m) { + if (m == null) { return; } + try { if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) @@ -150,21 +139,18 @@ public class Receiver extends Client implements MessageListener { int seq = m.getIntProperty("sequence"); - if (uniqueDests) + if (checkForDups) { if (seq == 0) { sequence = 0; // wrap around for each iteration + System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); + duplicateMessages.clear(); } if (seq < sequence) { duplicateMessages.add(seq); - if (reliability == Reliability.EXACTLY_ONCE) - { - throw new Exception(": Received a duplicate message (expected=" - + sequence + ",received=" + seq + ")" ); - } } else if (seq == sequence) { @@ -199,6 +185,7 @@ public class Receiver extends Client implements MessageListener { String host = "127.0.0.1"; int port = 5672; + String addr = "message_queue"; if (args.length > 0) { @@ -208,16 +195,16 @@ public class Receiver extends Client implements MessageListener { port = Integer.parseInt(args[1]); } - // #3rd argument should be an address - // Any other properties is best configured via jvm args + if (args.length > 2) + { + addr = args[2]; + } AMQConnection con = new AMQConnection( "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"); - // FIXME Need to add support for the new address format - // Then it's trivial to add destination for that. - Receiver rcv = new Receiver(con,null); + Receiver rcv = new Receiver(con,addr); rcv.run(); } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java index de50894491..14b9b7302f 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java @@ -36,6 +36,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.tools.MessageFactory; @@ -86,7 +87,7 @@ public class Sender extends Client protected MessageProducer producer; Random gen = new Random(19770905); - public Sender(Connection con,Destination dest) throws Exception + public Sender(Connection con,String addr) throws Exception { super(con); this.msg_size = Integer.getInteger("msg_size", 100); @@ -94,11 +95,11 @@ public class Sender extends Client this.iterations = Integer.getInteger("iterations", -1); this.sleep_time = Long.getLong("sleep_time", 1000); this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); - this.dest = dest; + this.dest = new AMQAnyDestination(addr); this.producer = getSsn().createProducer(dest); this.replyTo = getSsn().createTemporaryQueue(); - System.out.println("Sending messages to : " + dest); + System.out.println("Sending messages to : " + addr); } /* @@ -171,6 +172,7 @@ public class Sender extends Client { String host = "127.0.0.1"; int port = 5672; + String addr = "message_queue"; if (args.length > 0) { @@ -180,16 +182,16 @@ public class Sender extends Client { port = Integer.parseInt(args[1]); } - // #3rd argument should be an address - // Any other properties is best configured via jvm args + if (args.length > 2) + { + addr = args[2]; + } AMQConnection con = new AMQConnection( "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"); - // FIXME Need to add support for the new address format - // Then it's trivial to add destination for that. - Sender sender = new Sender(con,null); + Sender sender = new Sender(con,addr); sender.run(); } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java index b55afa7066..560ada244d 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -44,6 +44,7 @@ import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; +import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; @@ -72,17 +73,15 @@ public class TestLauncher implements ErrorHandler { protected String host = "127.0.0.1"; protected int port = 5672; - protected int session_count = 1; + protected int sessions_per_con = 1; protected int connection_count = 1; - protected long connection_idle_time = 5000; + protected long heartbeat = 5000; protected boolean sender = false; protected boolean receiver = false; + protected boolean useUniqueDests = false; protected String url; - protected String queue_name = "message_queue"; - protected String exchange_name = "amq.direct"; - protected String routing_key = "routing_key"; - protected boolean uniqueDests = false; + protected String address = "my_queue; {create: always}"; protected boolean durable = false; protected String failover = ""; protected AMQConnection controlCon; @@ -99,22 +98,18 @@ public class TestLauncher implements ErrorHandler testName = System.getProperty("test_name","UNKNOWN"); host = System.getProperty("host", "127.0.0.1"); port = Integer.getInteger("port", 5672); - session_count = Integer.getInteger("ssn_count", 1); + sessions_per_con = Integer.getInteger("ssn_per_con", 1); connection_count = Integer.getInteger("con_count", 1); - connection_idle_time = Long.getLong("con_idle_time", 5000); + heartbeat = Long.getLong("heartbeat", 5); sender = Boolean.getBoolean("sender"); receiver = Boolean.getBoolean("receiver"); + useUniqueDests = Boolean.getBoolean("use_unique_dests"); - queue_name = System.getProperty("queue_name", "message_queue"); - exchange_name = System.getProperty("exchange_name", "amq.direct"); - routing_key = System.getProperty("routing_key", "routing_key"); failover = System.getProperty("failover", ""); - uniqueDests = Boolean.getBoolean("unique_dests"); durable = Boolean.getBoolean("durable"); url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "?idle_timeout=" + connection_idle_time - + "'"; + + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; if (failover.equalsIgnoreCase("failover_exchange")) { @@ -149,12 +144,7 @@ public class TestLauncher implements ErrorHandler controlCon = new AMQConnection(url); controlCon.start(); - controlDest = new AMQQueue(new AMQShortString(""), - new AMQShortString("control"), - new AMQShortString("control"), - false, //exclusive - false, //auto-delete - false); // durable + controlDest = new AMQAnyDestination("control; {create: always}"); // durable // Create the session to setup the messages controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -184,17 +174,17 @@ public class TestLauncher implements ErrorHandler } } - public void start() + public void start(String addr) { try { + if (addr == null) + { + addr = address; + } - int ssn_per_con = session_count; - if (connection_count < session_count) - { - ssn_per_con = session_count/connection_count; - } - + int ssn_per_con = sessions_per_con; + String addrTemp = addr; for (int i = 0; i< connection_count; i++) { AMQConnection con = new AMQConnection(url); @@ -202,16 +192,20 @@ public class TestLauncher implements ErrorHandler clients.add(con); for (int j = 0; j< ssn_per_con; j++) { - String prefix = createPrefix(i,j); - Destination dest = createDest(prefix); + String index = createPrefix(i,j); + if (useUniqueDests) + { + addrTemp = modifySubject(index,addr); + } + if (sender) { - createSender(prefix,con,dest,this); + createSender(index,con,addrTemp,this); } if (receiver) { - createReceiver(prefix,con,dest,this); + createReceiver(index,con,addrTemp,this); } } } @@ -223,7 +217,7 @@ public class TestLauncher implements ErrorHandler } - protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h) + protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) { Runnable r = new Runnable() { @@ -231,7 +225,7 @@ public class TestLauncher implements ErrorHandler { try { - Receiver rcv = new Receiver(con,dest); + Receiver rcv = new Receiver(con,addr); rcv.setErrorHandler(h); rcv.run(); } @@ -256,7 +250,7 @@ public class TestLauncher implements ErrorHandler t.start(); } - protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h) + protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) { Runnable r = new Runnable() { @@ -264,7 +258,7 @@ public class TestLauncher implements ErrorHandler { try { - Sender sender = new Sender(con, dest); + Sender sender = new Sender(con, addr); sender.setErrorHandler(h); sender.run(); } @@ -289,7 +283,7 @@ public class TestLauncher implements ErrorHandler t.start(); } - public void handleError(String msg,Exception e) + public synchronized void handleError(String msg,Exception e) { // In case sending the message fails StringBuilder sb = new StringBuilder(); @@ -308,11 +302,13 @@ public class TestLauncher implements ErrorHandler errorMsg.setStringProperty("desc", msg); errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); - synchronized (this) - { - statusSender.send(errorMsg); - } - } catch (JMSException e1) { + + System.out.println("Msg " + errorMsg); + + statusSender.send(errorMsg); + } + catch (JMSException e1) + { e1.printStackTrace(); } } @@ -332,50 +328,38 @@ public class TestLauncher implements ErrorHandler } /** - * The following are supported. - * - * 1. A producer/consumer pair on a topic or a queue - * 2. A single producer with multiple consumers on topic/queue - * - * Multiple consumers on a topic will result in a private queue - * for each consumers. - * - * We want to avoid multiple producers on the same topic/queue - * as the queues will fill up in no time. + * A basic helper function to modify the subjects by + * appending an index. */ - private Destination createDest(String prefix) + private String modifySubject(String index,String addr) { - Destination dest = null; - if (exchange_name.equals("amq.topic")) + if (addr.indexOf("/") > 0) { - dest = new AMQTopic( - new AMQShortString(exchange_name), - new AMQShortString(uniqueDests ? prefix + routing_key : - routing_key), - false, //auto-delete - null, //queue name - durable); + addr = addr.substring(0,addr.indexOf("/")+1) + + index + + addr.substring(addr.indexOf("/")+1,addr.length()); + } + else if (addr.indexOf(";") > 0) + { + addr = addr.substring(0,addr.indexOf(";")) + + "/" + index + + addr.substring(addr.indexOf(";"),addr.length()); } else { - dest = new AMQQueue( - new AMQShortString(exchange_name), - new AMQShortString(uniqueDests ? prefix + routing_key : - routing_key), - new AMQShortString(uniqueDests ? prefix + queue_name : - queue_name), - false, //exclusive - false, //auto-delete - durable); + addr = addr + "/" + index; } - return dest; + + return addr; } public static void main(String[] args) { final TestLauncher test = new TestLauncher(); test.setUpControlChannel(); - test.start(); + System.out.println("args.length " + args.length); + System.out.println("args [0] " + args [0]); + test.start(args.length > 0 ? args [0] : null); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { test.cleanup(); } }); |