summaryrefslogtreecommitdiff
path: root/RC5/java/tools/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
commit38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch)
tree3599403c0c9690898f1e336c009a5564c587c732 /RC5/java/tools/src
parenta8960649bcd365ef70a5de7812f5910222388a6d (diff)
downloadqpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/java/tools/src')
-rw-r--r--RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java200
-rw-r--r--RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java857
2 files changed, 1057 insertions, 0 deletions
diff --git a/RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
new file mode 100644
index 0000000000..9ead0c19f2
--- /dev/null
+++ b/RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tools;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.jms.FailoverPolicy;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.Hashtable;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.LinkedList;
+import java.io.IOException;
+import java.io.File;
+import java.io.FileInputStream;
+
+public class JNDICheck
+{
+ private static final String QUEUE = "queue.";
+ private static final String TOPIC = "topic.";
+ private static final String DESTINATION = "destination.";
+ private static final String CONNECTION_FACTORY = "connectionfactory.";
+
+ public static void main(String[] args)
+ {
+
+ if (args.length != 1)
+ {
+ usage();
+ }
+
+ String propertyFile = args[0];
+
+ new JNDICheck(propertyFile);
+ }
+
+ private static void usage()
+ {
+ exit("Usage: JNDICheck <JNDI Config file>", 0);
+ }
+
+ private static void exit(String message, int exitCode)
+ {
+ System.err.println(message);
+ System.exit(exitCode);
+ }
+
+ private static String JAVA_NAMING = "java.naming.factory.initial";
+
+ Context _context = null;
+ Hashtable _environment = null;
+
+ public JNDICheck(String propertyFile)
+ {
+
+ // Load JNDI properties
+ Properties properties = new Properties();
+
+ try
+ {
+ properties.load(new FileInputStream(new File(propertyFile)));
+ }
+ catch (IOException e)
+ {
+ exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1);
+ }
+
+ //Create the initial context
+ try
+ {
+
+ System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING));
+
+ _context = new InitialContext(properties);
+
+ _environment = _context.getEnvironment();
+
+ Enumeration keys = _environment.keys();
+
+ List<String> queues = new LinkedList<String>();
+ List<String> topics = new LinkedList<String>();
+ List<String> destinations = new LinkedList<String>();
+ List<String> connectionFactories = new LinkedList<String>();
+
+ while (keys.hasMoreElements())
+ {
+ String key = keys.nextElement().toString();
+
+ if (key.startsWith(QUEUE))
+ {
+ queues.add(key);
+ }
+ else if (key.startsWith(TOPIC))
+ {
+ topics.add(key);
+ }
+ else if (key.startsWith(DESTINATION))
+ {
+ destinations.add(key);
+ }
+ else if (key.startsWith(CONNECTION_FACTORY))
+ {
+ connectionFactories.add(key);
+ }
+ }
+
+ printHeader(propertyFile);
+ printEntries(QUEUE, queues);
+ printEntries(TOPIC, topics);
+ printEntries(DESTINATION, destinations);
+ printEntries(CONNECTION_FACTORY, connectionFactories);
+
+ }
+ catch (NamingException e)
+ {
+ exit("Unable to load JNDI Context due to:" + e.getMessage(), 1);
+ }
+
+ }
+
+ private void printHeader(String file)
+ {
+ print("JNDI file :" + file);
+ }
+
+ private void printEntries(String type, List<String> list)
+ {
+ if (list.size() > 0)
+ {
+ String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1);
+ print(name + " elements in file:");
+ printList(list);
+ print("");
+ }
+ }
+
+ private void printList(List<String> list)
+ {
+ for (String item : list)
+ {
+ String key = item.substring(item.indexOf('.') + 1);
+
+ try
+ {
+ print(key, _context.lookup(key));
+ }
+ catch (NamingException e)
+ {
+ exit("Error: item " + key + " no longer in context.", 1);
+ }
+ }
+ }
+
+ private void print(String key, Object object)
+ {
+ if (object instanceof AMQDestination)
+ {
+ print(key + ":" + object);
+ }
+ else if (object instanceof AMQConnectionFactory)
+ {
+ AMQConnectionFactory factory = (AMQConnectionFactory) object;
+ print(key + ":Connection");
+ print("ConnectionURL:");
+ print(factory.getConnectionURL().toString());
+ print("FailoverPolicy");
+ print(new FailoverPolicy(factory.getConnectionURL()).toString());
+ print("");
+ }
+ }
+
+ private void print(String msg)
+ {
+ System.out.println(msg);
+ }
+
+}
diff --git a/RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
new file mode 100644
index 0000000000..7411e81bd6
--- /dev/null
+++ b/RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -0,0 +1,857 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.*;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+
+import static org.apache.qpid.tools.QpidBench.Mode.*;
+
+/**
+ * QpidBench
+ *
+ */
+
+public class QpidBench
+{
+
+ static enum Mode
+ {
+ PUBLISH, CONSUME, BOTH
+ }
+
+ private static class Options
+ {
+ private StringBuilder usage = new StringBuilder("qpid-bench <options>");
+
+ void usage(String name, String description, Object def)
+ {
+ String defval = "";
+ if (def != null)
+ {
+ defval = String.format(" (%s)", def);
+ }
+ usage.append(String.format("\n %-15s%-14s %s", name, defval, description));
+ }
+
+ public String broker = "localhost";
+ public int port = 5672;
+ public long count = 1000000;
+ public long window = 100000;
+ public long sample = window;
+ public int size = 1024;
+ public Mode mode = BOTH;
+ public boolean timestamp = false;
+ public boolean message_id = false;
+ public boolean message_cache = false;
+ public boolean persistent = false;
+ public boolean jms_publish = false;
+ public boolean jms_consume = false;
+ public boolean help = false;
+
+ {
+ usage("-b, --broker", "the broker hostname", broker);
+ }
+
+ public void parse__broker(String b)
+ {
+ this.broker = b;
+ }
+
+ public void parse_b(String b)
+ {
+ parse__broker(b);
+ }
+
+ {
+ usage("-p, --port", "the broker port", port);
+ }
+
+ public void parse__port(String p)
+ {
+ this.port = Integer.parseInt(p);
+ }
+
+ public void parse_p(String p)
+ {
+ parse__port(p);
+ }
+
+ {
+ usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
+ }
+
+ public void parse__count(String c)
+ {
+ this.count = Long.parseLong(c);
+ }
+
+ public void parse_c(String c)
+ {
+ parse__count(c);
+ }
+
+ {
+ usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
+ }
+
+ public void parse__window(String w)
+ {
+ this.window = Long.parseLong(w);
+ }
+
+ public void parse_w(String w)
+ {
+ parse__window(w);
+ }
+
+ {
+ usage("--sample", "print stats after this many messages, 0 disables", sample);
+ }
+
+ public void parse__sample(String s)
+ {
+ this.sample = Long.parseLong(s);
+ }
+
+ {
+ usage("-i, --interval", "sets both --window and --sample", window);
+ }
+
+ public void parse__interval(String i)
+ {
+ this.window = Long.parseLong(i);
+ this.sample = window;
+ }
+
+ public void parse_i(String i)
+ {
+ parse__interval(i);
+ }
+
+ {
+ usage("-s, --size", "the message size", size);
+ }
+
+ public void parse__size(String s)
+ {
+ this.size = Integer.parseInt(s);
+ }
+
+ public void parse_s(String s)
+ {
+ parse__size(s);
+ }
+
+ {
+ usage("-m, --mode", "one of publish, consume, or both", mode);
+ }
+
+ public void parse__mode(String m)
+ {
+ if (m.equalsIgnoreCase("publish"))
+ {
+ this.mode = PUBLISH;
+ }
+ else if (m.equalsIgnoreCase("consume"))
+ {
+ this.mode = CONSUME;
+ }
+ else if (m.equalsIgnoreCase("both"))
+ {
+ this.mode = BOTH;
+ }
+ else
+ {
+ throw new IllegalArgumentException
+ ("must be one of 'publish', 'consume', or 'both'");
+ }
+ }
+
+ public void parse_m(String m)
+ {
+ parse__mode(m);
+ }
+
+ {
+ usage("--timestamp", "set timestamps on each message if true", timestamp);
+ }
+
+ public void parse__timestamp(String t)
+ {
+ this.timestamp = Boolean.parseBoolean(t);
+ }
+
+ {
+ usage("--mesage-id", "set the message-id on each message if true", message_id);
+ }
+
+ public void parse__message_id(String m)
+ {
+ this.message_id = Boolean.parseBoolean(m);
+ }
+
+ {
+ usage("--message-cache", "reuse the same message for each send if true", message_cache);
+ }
+
+ public void parse__message_cache(String c)
+ {
+ this.message_cache = Boolean.parseBoolean(c);
+ }
+
+ {
+ usage("--persistent", "set the delivery-mode to persistent if true", persistent);
+ }
+
+ public void parse__persistent(String p)
+ {
+ this.persistent = Boolean.parseBoolean(p);
+ }
+
+ {
+ usage("--jms-publish", "use the jms client for publish", jms_publish);
+ }
+
+ public void parse__jms_publish(String jp)
+ {
+ this.jms_publish = Boolean.parseBoolean(jp);
+ }
+
+ {
+ usage("--jms-consume", "use the jms client for consume", jms_consume);
+ }
+
+ public void parse__jms_consume(String jc)
+ {
+ this.jms_consume = Boolean.parseBoolean(jc);
+ }
+
+ {
+ usage("--jms", "sets both --jms-publish and --jms-consume", false);
+ }
+
+ public void parse__jms(String j)
+ {
+ this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
+ }
+
+ {
+ usage("-h, --help", "prints this message", null);
+ }
+
+ public void parse__help()
+ {
+ this.help = true;
+ }
+
+ public void parse_h()
+ {
+ parse__help();
+ }
+
+ public String parse(String ... args)
+ {
+ Class klass = getClass();
+ List<String> arguments = new ArrayList<String>();
+ for (int i = 0; i < args.length; i++)
+ {
+ String option = args[i];
+
+ if (!option.startsWith("-"))
+ {
+ arguments.add(option);
+ continue;
+ }
+
+ String method = "parse" + option.replace('-', '_');
+ try
+ {
+ try
+ {
+ Method parser = klass.getMethod(method);
+ parser.invoke(this);
+ }
+ catch (NoSuchMethodException e)
+ {
+ try
+ {
+ Method parser = klass.getMethod(method, String.class);
+
+ String value = null;
+ if (i + 1 < args.length)
+ {
+ value = args[i+1];
+ i++;
+ }
+ else
+ {
+ return option + " requires a value";
+ }
+
+ parser.invoke(this, value);
+ }
+ catch (NoSuchMethodException e2)
+ {
+ return "no such option: " + option;
+ }
+ }
+ }
+ catch (InvocationTargetException e)
+ {
+ Throwable t = e.getCause();
+ return String.format
+ ("error parsing %s: %s: %s", option, t.getClass().getName(),
+ t.getMessage());
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException
+ ("unable to access parse method: " + option, e);
+ }
+ }
+
+ return parseArguments(arguments);
+ }
+
+ public String parseArguments(List<String> arguments)
+ {
+ if (arguments.size() > 0)
+ {
+ String args = arguments.toString();
+ return "unrecognized arguments: " + args.substring(1, args.length() - 1);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public String toString()
+ {
+ Class klass = getClass();
+ Field[] fields = klass.getFields();
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < fields.length; i++)
+ {
+ if (i > 0)
+ {
+ str.append("\n");
+ }
+
+ String name = fields[i].getName();
+ str.append(name);
+ str.append(" = ");
+ Object value;
+ try
+ {
+ value = fields[i].get(this);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException
+ ("unable to access field: " + name, e);
+ }
+ str.append(value);
+ }
+
+ return str.toString();
+ }
+ }
+
+ public static final void main(String[] args) throws Exception
+ {
+ final Options opts = new Options();
+ String error = opts.parse(args);
+ if (error != null)
+ {
+ System.err.println(error);
+ System.exit(-1);
+ return;
+ }
+
+ if (opts.help)
+ {
+ System.out.println(opts.usage);
+ return;
+ }
+
+ System.out.println(opts);
+
+ switch (opts.mode)
+ {
+ case CONSUME:
+ case BOTH:
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ if (opts.jms_consume)
+ {
+ jms_consumer(opts);
+ }
+ else
+ {
+ native_consumer(opts);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }.start();
+ break;
+ }
+
+ switch (opts.mode)
+ {
+ case PUBLISH:
+ case BOTH:
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ if (opts.jms_publish)
+ {
+ jms_publisher(opts);
+ }
+ else
+ {
+ native_publisher(opts);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }.start();
+ break;
+ }
+ }
+
+ private static enum Column
+ {
+ LEFT, RIGHT
+ }
+
+ private static final void sample(Options opts, Column col, String name, long count,
+ long start, long time, long lastTime)
+ {
+ String pfx = "";
+ String sfx = "";
+ if (opts.mode == BOTH)
+ {
+ if (col == Column.RIGHT)
+ {
+ pfx = " -- ";
+ }
+ else
+ {
+ sfx = " --";
+ }
+ }
+
+ if (count == 0)
+ {
+ String stats = String.format("%s: %tc", name, start);
+ System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
+ return;
+ }
+
+ double cumulative = 1000 * (double) count / (double) (time - start);
+ double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
+
+ String stats = String.format
+ ("%s: %d %.2f %.2f", name, count, cumulative, interval);
+ System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
+ }
+
+ private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
+ {
+ String url = String.format
+ ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
+ opts.broker, opts.port);
+ return new AMQConnection(url);
+ }
+
+ private static final void jms_publisher(Options opts) throws Exception
+ {
+ javax.jms.Connection conn = getJMSConnection(opts);
+
+ javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Destination dest = ssn.createQueue("test-queue");
+ Destination echo_dest = ssn.createQueue("echo-queue");
+ MessageProducer prod = ssn.createProducer(dest);
+ MessageConsumer cons = ssn.createConsumer(echo_dest);
+ prod.setDisableMessageID(!opts.message_id);
+ prod.setDisableMessageTimestamp(!opts.timestamp);
+ prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < opts.size; i++)
+ {
+ str.append((char) (i % 128));
+ }
+
+ String body = str.toString();
+
+ TextMessage cached = ssn.createTextMessage();
+ cached.setText(body);
+
+ conn.start();
+
+ long count = 0;
+ long lastTime = 0;
+ long start = System.currentTimeMillis();
+ while (opts.count == 0 || count < opts.count)
+ {
+ if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
+ {
+ Message echo = cons.receive();
+ }
+
+ if (opts.sample > 0 && (count % opts.sample) == 0)
+ {
+ long time = System.currentTimeMillis();
+ sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
+ lastTime = time;
+ }
+
+ TextMessage m;
+ if (opts.message_cache)
+ {
+ m = cached;
+ }
+ else
+ {
+ m = ssn.createTextMessage();
+ m.setText(body);
+ }
+
+ prod.send(m);
+ count++;
+ }
+
+ conn.close();
+ }
+
+ private static final void jms_consumer(final Options opts) throws Exception
+ {
+ final javax.jms.Connection conn = getJMSConnection(opts);
+ javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Destination dest = ssn.createQueue("test-queue");
+ Destination echo_dest = ssn.createQueue("echo-queue");
+ MessageConsumer cons = ssn.createConsumer(dest);
+ final MessageProducer prod = ssn.createProducer(echo_dest);
+ prod.setDisableMessageID(true);
+ prod.setDisableMessageTimestamp(true);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ final TextMessage echo = ssn.createTextMessage();
+ echo.setText("ECHO");
+
+ final Object done = new Object();
+ cons.setMessageListener(new MessageListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void onMessage(Message m)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ try
+ {
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ prod.send(echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
+
+ conn.start();
+ synchronized (done)
+ {
+ done.wait();
+ }
+ conn.close();
+ }
+
+ private static final org.apache.qpid.transport.Connection getConnection
+ (Options opts)
+ {
+ org.apache.qpid.transport.Connection conn =
+ new org.apache.qpid.transport.Connection();
+ conn.connect(opts.broker, opts.port, null, "guest", "guest",false);
+ return conn;
+ }
+
+ private static abstract class NativeListener implements SessionListener
+ {
+
+ public void opened(org.apache.qpid.transport.Session ssn) {}
+
+ public void exception(org.apache.qpid.transport.Session ssn,
+ SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(org.apache.qpid.transport.Session ssn) {}
+
+ }
+
+ private static final void native_publisher(Options opts) throws Exception
+ {
+ final long[] echos = { 0 };
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ synchronized (echos)
+ {
+ echos[0]++;
+ echos.notify();
+ }
+ ssn.processed(xfr);
+ }
+ });
+
+ ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
+ ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
+
+ MessageProperties cached_mp = new MessageProperties();
+ DeliveryProperties cached_dp = new DeliveryProperties();
+ cached_dp.setRoutingKey("test-queue");
+ cached_dp.setDeliveryMode
+ (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
+
+ int size = opts.size;
+ ByteBuffer body = ByteBuffer.allocate(size);
+ for (int i = 0; i < size; i++)
+ {
+ body.put((byte) i);
+ }
+ body.flip();
+
+ ssn.invoke(new MessageSubscribe()
+ .queue("echo-queue")
+ .destination("echo-queue")
+ .acceptMode(MessageAcceptMode.NONE)
+ .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
+ ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
+ ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
+ ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
+
+ UUIDGen gen = UUIDs.newGenerator();
+
+ long count = 0;
+ long lastTime = 0;
+ long start = System.currentTimeMillis();
+ while (opts.count == 0 || count < opts.count)
+ {
+ if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
+ {
+ synchronized (echos)
+ {
+ while (echos[0] < (count/opts.window))
+ {
+ echos.wait();
+ }
+ }
+ }
+
+ if (opts.sample > 0 && (count % opts.sample) == 0)
+ {
+ long time = System.currentTimeMillis();
+ sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
+ lastTime = time;
+ }
+
+ MessageProperties mp;
+ DeliveryProperties dp;
+ if (opts.message_cache)
+ {
+ mp = cached_mp;
+ dp = cached_dp;
+ }
+ else
+ {
+ mp = new MessageProperties();
+ dp = new DeliveryProperties();
+ dp.setRoutingKey("test-queue");
+ dp.setDeliveryMode
+ (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
+
+ }
+
+ if (opts.message_id)
+ {
+ mp.setMessageId(gen.generate());
+ }
+
+ if (opts.timestamp)
+ {
+ dp.setTimestamp(System.currentTimeMillis());
+ }
+
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp), body.slice());
+ count++;
+ }
+
+ ssn.messageCancel("echo-queue");
+
+ ssn.sync();
+ ssn.close();
+ conn.close();
+ }
+
+ private static final void native_consumer(final Options opts) throws Exception
+ {
+ final DeliveryProperties dp = new DeliveryProperties();
+ final byte[] echo = new byte[0];
+ dp.setRoutingKey("echo-queue");
+ dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
+ final MessageProperties mp = new MessageProperties();
+ final Object done = new Object();
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ ssn.messageTransfer("amq.direct",
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp),
+ echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ ssn.processed(xfr);
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
+
+ ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
+ ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
+
+ ssn.invoke(new MessageSubscribe()
+ .queue("test-queue")
+ .destination("test-queue")
+ .acceptMode(MessageAcceptMode.NONE)
+ .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
+ ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
+ ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
+ ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
+
+ synchronized (done)
+ {
+ done.wait();
+ }
+
+ ssn.messageCancel("test-queue");
+
+ ssn.sync();
+ ssn.close();
+ conn.close();
+ }
+
+}