summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-09-29 01:55:36 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-09-29 01:55:36 +0000
commita14a3a1c1a73c58f4d4682bdf72be3db40ce1eb4 (patch)
tree8d620b7fc7a06de806f56847b7b7253155b8d855
parent4251b6440296ea270e9ae63f57c2966db0fcf0a0 (diff)
downloadqpid-python-a14a3a1c1a73c58f4d4682bdf72be3db40ce1eb4.tar.gz
Modified the Sender and Receiver to work with the new addressing strings.
You could now invoke the sender or receiver by passing an addressing string as a program argument. The sender and receiver could now be utilised more easily as a building block for scripting test cases. Modified the TestLauncher to also work with addressing strings. The Receiver was also modified to work as a durable subscriber if specified via a JVM arg. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1002446 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java87
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java18
-rw-r--r--qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java128
3 files changed, 103 insertions, 130 deletions
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
index 6d33a5f788..b6b1bd29a0 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
@@ -32,10 +32,11 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
+import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
/**
- * A generic receiver which consumers a stream of messages
+ * A generic receiver which consumes messages
* from a given address in a broker (host/port)
* until told to stop by killing it.
*
@@ -63,52 +64,31 @@ import org.apache.qpid.client.AMQConnection;
* report_frequency - how often a timestamp is printed
* durable
* transacted
- * tx_size - size of transaction batch in # msgs.
+ * tx_size - size of transaction batch in # msgs. *
+ * check_for_dups - check for duplicate messages and out of order messages.
+ * jms_durable_sub - create a durable subscription instead of a regular subscription.
*/
public class Receiver extends Client implements MessageListener
{
- // Until addressing is properly supported.
- protected enum Reliability {
- AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE;
-
- Reliability getReliability(String s)
- {
- if (s.equalsIgnoreCase("at_most_once"))
- {
- return AT_MOST_ONCE;
- }
- else if (s.equalsIgnoreCase("at_least_once"))
- {
- return AT_LEAST_ONCE;
- }
- else
- {
- return EXACTLY_ONCE;
- }
- }
- };
-
long msg_count = 0;
int sequence = 0;
- boolean sync_rcv = Boolean.getBoolean("sync_rcv");
- boolean uniqueDests = Boolean.getBoolean("unique_dests");
- Reliability reliability = Reliability.EXACTLY_ONCE;
+ boolean syncRcv = Boolean.getBoolean("sync_rcv");
+ boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
+ boolean checkForDups = Boolean.getBoolean("check_for_dups");
MessageConsumer consumer;
List<Integer> duplicateMessages = new ArrayList<Integer>();
- public Receiver(Connection con,Destination dest) throws Exception
+ public Receiver(Connection con,String addr) throws Exception
{
super(con);
- reliability = reliability.getReliability(System.getProperty("reliability","exactly_once"));
setSsn(con.createSession(isTransacted(), getAck_mode()));
- consumer = getSsn().createConsumer(dest);
- if (!sync_rcv)
+ consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
+ if (!syncRcv)
{
consumer.setMessageListener(this);
}
- System.out.println("Operating in mode : " + reliability);
- System.out.println("Receiving messages from : " + dest);
+ System.out.println("Receiving messages from : " + addr);
}
public void onMessage(Message msg)
@@ -118,21 +98,30 @@ public class Receiver extends Client implements MessageListener
public void run() throws Exception
{
+ long sleepTime = getReportFrequency();
while(true)
{
- if(sync_rcv)
- {
- Message msg = consumer.receive();
- handleMessage(msg);
+ if(syncRcv)
+ {
+ long t = sleepTime;
+ while (t > 0)
+ {
+ long start = System.currentTimeMillis();
+ Message msg = consumer.receive(t);
+ t = t - (System.currentTimeMillis() - start);
+ handleMessage(msg);
+ }
}
- Thread.sleep(getReportFrequency());
- System.out.println(getDf().format(System.currentTimeMillis())
- + " - messages received : " + msg_count);
+ Thread.sleep(sleepTime);
+ System.out.println(getDf().format(System.currentTimeMillis())
+ + " - messages received : " + msg_count);
}
}
private void handleMessage(Message m)
{
+ if (m == null) { return; }
+
try
{
if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
@@ -150,21 +139,18 @@ public class Receiver extends Client implements MessageListener
{
int seq = m.getIntProperty("sequence");
- if (uniqueDests)
+ if (checkForDups)
{
if (seq == 0)
{
sequence = 0; // wrap around for each iteration
+ System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
+ duplicateMessages.clear();
}
if (seq < sequence)
{
duplicateMessages.add(seq);
- if (reliability == Reliability.EXACTLY_ONCE)
- {
- throw new Exception(": Received a duplicate message (expected="
- + sequence + ",received=" + seq + ")" );
- }
}
else if (seq == sequence)
{
@@ -199,6 +185,7 @@ public class Receiver extends Client implements MessageListener
{
String host = "127.0.0.1";
int port = 5672;
+ String addr = "message_queue";
if (args.length > 0)
{
@@ -208,16 +195,16 @@ public class Receiver extends Client implements MessageListener
{
port = Integer.parseInt(args[1]);
}
- // #3rd argument should be an address
- // Any other properties is best configured via jvm args
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
AMQConnection con = new AMQConnection(
"amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ host + ":" + port + "'");
- // FIXME Need to add support for the new address format
- // Then it's trivial to add destination for that.
- Receiver rcv = new Receiver(con,null);
+ Receiver rcv = new Receiver(con,addr);
rcv.run();
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
index de50894491..14b9b7302f 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
@@ -36,6 +36,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.tools.MessageFactory;
@@ -86,7 +87,7 @@ public class Sender extends Client
protected MessageProducer producer;
Random gen = new Random(19770905);
- public Sender(Connection con,Destination dest) throws Exception
+ public Sender(Connection con,String addr) throws Exception
{
super(con);
this.msg_size = Integer.getInteger("msg_size", 100);
@@ -94,11 +95,11 @@ public class Sender extends Client
this.iterations = Integer.getInteger("iterations", -1);
this.sleep_time = Long.getLong("sleep_time", 1000);
this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
- this.dest = dest;
+ this.dest = new AMQAnyDestination(addr);
this.producer = getSsn().createProducer(dest);
this.replyTo = getSsn().createTemporaryQueue();
- System.out.println("Sending messages to : " + dest);
+ System.out.println("Sending messages to : " + addr);
}
/*
@@ -171,6 +172,7 @@ public class Sender extends Client
{
String host = "127.0.0.1";
int port = 5672;
+ String addr = "message_queue";
if (args.length > 0)
{
@@ -180,16 +182,16 @@ public class Sender extends Client
{
port = Integer.parseInt(args[1]);
}
- // #3rd argument should be an address
- // Any other properties is best configured via jvm args
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
AMQConnection con = new AMQConnection(
"amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ host + ":" + port + "'");
- // FIXME Need to add support for the new address format
- // Then it's trivial to add destination for that.
- Sender sender = new Sender(con,null);
+ Sender sender = new Sender(con,addr);
sender.run();
}
}
diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
index b55afa7066..560ada244d 100644
--- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
@@ -44,6 +44,7 @@ import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
@@ -72,17 +73,15 @@ public class TestLauncher implements ErrorHandler
{
protected String host = "127.0.0.1";
protected int port = 5672;
- protected int session_count = 1;
+ protected int sessions_per_con = 1;
protected int connection_count = 1;
- protected long connection_idle_time = 5000;
+ protected long heartbeat = 5000;
protected boolean sender = false;
protected boolean receiver = false;
+ protected boolean useUniqueDests = false;
protected String url;
- protected String queue_name = "message_queue";
- protected String exchange_name = "amq.direct";
- protected String routing_key = "routing_key";
- protected boolean uniqueDests = false;
+ protected String address = "my_queue; {create: always}";
protected boolean durable = false;
protected String failover = "";
protected AMQConnection controlCon;
@@ -99,22 +98,18 @@ public class TestLauncher implements ErrorHandler
testName = System.getProperty("test_name","UNKNOWN");
host = System.getProperty("host", "127.0.0.1");
port = Integer.getInteger("port", 5672);
- session_count = Integer.getInteger("ssn_count", 1);
+ sessions_per_con = Integer.getInteger("ssn_per_con", 1);
connection_count = Integer.getInteger("con_count", 1);
- connection_idle_time = Long.getLong("con_idle_time", 5000);
+ heartbeat = Long.getLong("heartbeat", 5);
sender = Boolean.getBoolean("sender");
receiver = Boolean.getBoolean("receiver");
+ useUniqueDests = Boolean.getBoolean("use_unique_dests");
- queue_name = System.getProperty("queue_name", "message_queue");
- exchange_name = System.getProperty("exchange_name", "amq.direct");
- routing_key = System.getProperty("routing_key", "routing_key");
failover = System.getProperty("failover", "");
- uniqueDests = Boolean.getBoolean("unique_dests");
durable = Boolean.getBoolean("durable");
url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "?idle_timeout=" + connection_idle_time
- + "'";
+ + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
if (failover.equalsIgnoreCase("failover_exchange"))
{
@@ -149,12 +144,7 @@ public class TestLauncher implements ErrorHandler
controlCon = new AMQConnection(url);
controlCon.start();
- controlDest = new AMQQueue(new AMQShortString(""),
- new AMQShortString("control"),
- new AMQShortString("control"),
- false, //exclusive
- false, //auto-delete
- false); // durable
+ controlDest = new AMQAnyDestination("control; {create: always}"); // durable
// Create the session to setup the messages
controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -184,17 +174,17 @@ public class TestLauncher implements ErrorHandler
}
}
- public void start()
+ public void start(String addr)
{
try
{
+ if (addr == null)
+ {
+ addr = address;
+ }
- int ssn_per_con = session_count;
- if (connection_count < session_count)
- {
- ssn_per_con = session_count/connection_count;
- }
-
+ int ssn_per_con = sessions_per_con;
+ String addrTemp = addr;
for (int i = 0; i< connection_count; i++)
{
AMQConnection con = new AMQConnection(url);
@@ -202,16 +192,20 @@ public class TestLauncher implements ErrorHandler
clients.add(con);
for (int j = 0; j< ssn_per_con; j++)
{
- String prefix = createPrefix(i,j);
- Destination dest = createDest(prefix);
+ String index = createPrefix(i,j);
+ if (useUniqueDests)
+ {
+ addrTemp = modifySubject(index,addr);
+ }
+
if (sender)
{
- createSender(prefix,con,dest,this);
+ createSender(index,con,addrTemp,this);
}
if (receiver)
{
- createReceiver(prefix,con,dest,this);
+ createReceiver(index,con,addrTemp,this);
}
}
}
@@ -223,7 +217,7 @@ public class TestLauncher implements ErrorHandler
}
- protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+ protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
{
Runnable r = new Runnable()
{
@@ -231,7 +225,7 @@ public class TestLauncher implements ErrorHandler
{
try
{
- Receiver rcv = new Receiver(con,dest);
+ Receiver rcv = new Receiver(con,addr);
rcv.setErrorHandler(h);
rcv.run();
}
@@ -256,7 +250,7 @@ public class TestLauncher implements ErrorHandler
t.start();
}
- protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+ protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
{
Runnable r = new Runnable()
{
@@ -264,7 +258,7 @@ public class TestLauncher implements ErrorHandler
{
try
{
- Sender sender = new Sender(con, dest);
+ Sender sender = new Sender(con, addr);
sender.setErrorHandler(h);
sender.run();
}
@@ -289,7 +283,7 @@ public class TestLauncher implements ErrorHandler
t.start();
}
- public void handleError(String msg,Exception e)
+ public synchronized void handleError(String msg,Exception e)
{
// In case sending the message fails
StringBuilder sb = new StringBuilder();
@@ -308,11 +302,13 @@ public class TestLauncher implements ErrorHandler
errorMsg.setStringProperty("desc", msg);
errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
- synchronized (this)
- {
- statusSender.send(errorMsg);
- }
- } catch (JMSException e1) {
+
+ System.out.println("Msg " + errorMsg);
+
+ statusSender.send(errorMsg);
+ }
+ catch (JMSException e1)
+ {
e1.printStackTrace();
}
}
@@ -332,50 +328,38 @@ public class TestLauncher implements ErrorHandler
}
/**
- * The following are supported.
- *
- * 1. A producer/consumer pair on a topic or a queue
- * 2. A single producer with multiple consumers on topic/queue
- *
- * Multiple consumers on a topic will result in a private queue
- * for each consumers.
- *
- * We want to avoid multiple producers on the same topic/queue
- * as the queues will fill up in no time.
+ * A basic helper function to modify the subjects by
+ * appending an index.
*/
- private Destination createDest(String prefix)
+ private String modifySubject(String index,String addr)
{
- Destination dest = null;
- if (exchange_name.equals("amq.topic"))
+ if (addr.indexOf("/") > 0)
{
- dest = new AMQTopic(
- new AMQShortString(exchange_name),
- new AMQShortString(uniqueDests ? prefix + routing_key :
- routing_key),
- false, //auto-delete
- null, //queue name
- durable);
+ addr = addr.substring(0,addr.indexOf("/")+1) +
+ index +
+ addr.substring(addr.indexOf("/")+1,addr.length());
+ }
+ else if (addr.indexOf(";") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf(";")) +
+ "/" + index +
+ addr.substring(addr.indexOf(";"),addr.length());
}
else
{
- dest = new AMQQueue(
- new AMQShortString(exchange_name),
- new AMQShortString(uniqueDests ? prefix + routing_key :
- routing_key),
- new AMQShortString(uniqueDests ? prefix + queue_name :
- queue_name),
- false, //exclusive
- false, //auto-delete
- durable);
+ addr = addr + "/" + index;
}
- return dest;
+
+ return addr;
}
public static void main(String[] args)
{
final TestLauncher test = new TestLauncher();
test.setUpControlChannel();
- test.start();
+ System.out.println("args.length " + args.length);
+ System.out.println("args [0] " + args [0]);
+ test.start(args.length > 0 ? args [0] : null);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() { test.cleanup(); }
});