From 0a2270640b1b91915902b60d46cb3dd421218eda Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 4 Jul 2008 17:42:12 +0000 Subject: Addition of tools directory for various Qpid Java tools The first too JNDICheck allows the contents of a JNDI properties file to be parsed and presented as JNDI will process it. Handly for validating the parsing of a ConnectionURL git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@674102 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/JNDICheck.java | 200 +++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java new file mode 100644 index 0000000000..1cbcfee148 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tools; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.jms.FailoverPolicy; + +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.util.Hashtable; +import java.util.Enumeration; +import java.util.List; +import java.util.LinkedList; +import java.io.IOException; +import java.io.File; +import java.io.FileReader; + +public class JNDICheck +{ + private static final String QUEUE = "queue."; + private static final String TOPIC = "topic."; + private static final String DESTINATION = "destination."; + private static final String CONNECTION_FACTORY = "connectionfactory."; + + public static void main(String[] args) + { + + if (args.length != 1) + { + usage(); + } + + String propertyFile = args[0]; + + new JNDICheck(propertyFile); + } + + private static void usage() + { + exit("Usage: JNDICheck ", 0); + } + + private static void exit(String message, int exitCode) + { + System.err.println(message); + System.exit(exitCode); + } + + private static String JAVA_NAMING = "java.naming.factory.initial"; + + Context _context = null; + Hashtable _environment = null; + + public JNDICheck(String propertyFile) + { + + // Load JNDI properties + Properties properties = new Properties(); + + try + { + properties.load(new FileReader(new File(propertyFile))); + } + catch (IOException e) + { + exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1); + } + + //Create the initial context + try + { + + System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING)); + + _context = new InitialContext(properties); + + _environment = _context.getEnvironment(); + + Enumeration keys = _environment.keys(); + + List queues = new LinkedList(); + List topics = new LinkedList(); + List destinations = new LinkedList(); + List connectionFactories = new LinkedList(); + + while (keys.hasMoreElements()) + { + String key = keys.nextElement().toString(); + + if (key.startsWith(QUEUE)) + { + queues.add(key); + } + else if (key.startsWith(TOPIC)) + { + topics.add(key); + } + else if (key.startsWith(DESTINATION)) + { + destinations.add(key); + } + else if (key.startsWith(CONNECTION_FACTORY)) + { + connectionFactories.add(key); + } + } + + printHeader(propertyFile); + printEntries(QUEUE, queues); + printEntries(TOPIC, topics); + printEntries(DESTINATION, destinations); + printEntries(CONNECTION_FACTORY, connectionFactories); + + } + catch (NamingException e) + { + exit("Unable to load JNDI Context due to:" + e.getMessage(), 1); + } + + } + + private void printHeader(String file) + { + print("JNDI file :" + file); + } + + private void printEntries(String type, List list) + { + if (list.size() > 0) + { + String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1); + print(name + " elements in file:"); + printList(list); + print(""); + } + } + + private void printList(List list) + { + for (String item : list) + { + String key = item.substring(item.indexOf('.') + 1); + + try + { + print(key, _context.lookup(key)); + } + catch (NamingException e) + { + exit("Error: item " + key + " no longer in context.", 1); + } + } + } + + private void print(String key, Object object) + { + if (object instanceof AMQDestination) + { + print(key + ":" + object); + } + else if (object instanceof AMQConnectionFactory) + { + AMQConnectionFactory factory = (AMQConnectionFactory) object; + print(key + ":Connection"); + print("ConnectionURL:"); + print(factory.getConnectionURL().toString()); + print("FailoverPolicy"); + print(new FailoverPolicy(factory.getConnectionURL()).toString()); + print(""); + } + } + + private void print(String msg) + { + System.out.println(msg); + } + +} -- cgit v1.2.1 From 1e8b383a86e0b2fb000bd08cc72d044c84e24d2a Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 1 Aug 2008 12:13:47 +0000 Subject: added benchmark tool for java native + jms APIs git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@681666 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/QpidBench.java | 884 +++++++++++++++++++++ 1 file changed, 884 insertions(+) create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java new file mode 100644 index 0000000000..79caba67fd --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -0,0 +1,884 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.*; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.io.IoTransport; + +import static org.apache.qpid.tools.QpidBench.Mode.*; + +/** + * QpidBench + * + */ + +public class QpidBench +{ + + static enum Mode + { + PUBLISH, CONSUME, BOTH + } + + private static class Options + { + private StringBuilder usage = new StringBuilder("qpid-bench "); + + void usage(String name, String description, Object def) + { + String defval = ""; + if (def != null) + { + defval = String.format(" (%s)", def); + } + usage.append(String.format("\n %-15s%-15s %s", name, defval, description)); + } + + public String broker = "localhost"; + public int port = 5672; + public long count = 1000000; + public long window = 100000; + public long sample = window; + public int size = 1024; + public Mode mode = BOTH; + public boolean timestamp = false; + public boolean message_id = false; + public boolean message_cache = false; + public boolean persistent = false; + public boolean jms_publish = false; + public boolean jms_consume = false; + public boolean help = false; + + { + usage("-b, --broker", "the broker hostname", broker); + } + + public void parse__broker(String b) + { + this.broker = b; + } + + public void parse_b(String b) + { + parse__broker(b); + } + + { + usage("-p, --port", "the broker port", port); + } + + public void parse__port(String p) + { + this.port = Integer.parseInt(p); + } + + public void parse_p(String p) + { + parse__port(p); + } + + { + usage("-c, --count", "the number of messages to send and/or receive", count); + } + + public void parse__count(String c) + { + this.count = Long.parseLong(c); + } + + public void parse_c(String c) + { + parse__count(c); + } + + { + usage("-w, --window", "the number of messages to send before blocking", window); + } + + public void parse__window(String w) + { + this.window = Long.parseLong(w); + } + + public void parse_w(String w) + { + parse__window(w); + } + + { + usage("--sample", "print stats after this many messages", sample); + } + + public void parse__sample(String s) + { + this.sample = Long.parseLong(s); + } + + { + usage("-i, --interval", "sets both --window and --sample", window); + } + + public void parse__interval(String i) + { + this.window = Long.parseLong(i); + this.sample = window; + } + + public void parse_i(String i) + { + parse__interval(i); + } + + { + usage("-s, --size", "the message size", size); + } + + public void parse__size(String s) + { + this.size = Integer.parseInt(s); + } + + public void parse_s(String s) + { + parse__size(s); + } + + { + usage("-m, --mode", "one of publish, consume, or both", mode); + } + + public void parse__mode(String m) + { + if (m.equalsIgnoreCase("publish")) + { + this.mode = PUBLISH; + } + else if (m.equalsIgnoreCase("consume")) + { + this.mode = CONSUME; + } + else if (m.equalsIgnoreCase("both")) + { + this.mode = BOTH; + } + else + { + throw new IllegalArgumentException + ("must be one of 'publish', 'consume', or 'both'"); + } + } + + public void parse_m(String m) + { + parse__mode(m); + } + + { + usage("--timestamp", "set timestamps on each message if true", timestamp); + } + + public void parse__timestamp(String t) + { + this.timestamp = Boolean.parseBoolean(t); + } + + { + usage("--mesage-id", "set the message-id on each message if true", message_id); + } + + public void parse__message_id(String m) + { + this.message_id = Boolean.parseBoolean(m); + } + + { + usage("--message-cache", "reuse the same for each send if true", message_cache); + } + + public void parse__message_cache(String c) + { + this.message_cache = Boolean.parseBoolean(c); + } + + { + usage("--persistent", "set the delivery-mode to persistent if true", persistent); + } + + public void parse__persistent(String p) + { + this.persistent = Boolean.parseBoolean(p); + } + + { + usage("--jms-publish", "use the jms client for publish", jms_publish); + } + + public void parse__jms_publish(String jp) + { + this.jms_publish = Boolean.parseBoolean(jp); + } + + { + usage("--jms-consume", "use the jms client for consume", jms_consume); + } + + public void parse__jms_consume(String jc) + { + this.jms_consume = Boolean.parseBoolean(jc); + } + + { + usage("--jms", "sets both --jms-publish and --jms-consume", false); + } + + public void parse__jms(String j) + { + this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); + } + + { + usage("-h, --help", "prints this message", null); + } + + public void parse__help() + { + this.help = true; + } + + public void parse_h() + { + parse__help(); + } + + public String parse(String ... args) + { + Class klass = getClass(); + List arguments = new ArrayList(); + for (int i = 0; i < args.length; i++) + { + String option = args[i]; + + if (!option.startsWith("-")) + { + arguments.add(option); + continue; + } + + String method = "parse" + option.replace('-', '_'); + try + { + try + { + Method parser = klass.getMethod(method); + parser.invoke(this); + } + catch (NoSuchMethodException e) + { + try + { + Method parser = klass.getMethod(method, String.class); + + String value = null; + if (i + 1 < args.length) + { + value = args[i+1]; + i++; + } + else + { + return option + " requires a value"; + } + + parser.invoke(this, value); + } + catch (NoSuchMethodException e2) + { + return "no such option: " + option; + } + } + } + catch (InvocationTargetException e) + { + Throwable t = e.getCause(); + return String.format + ("error parsing %s: %s: %s", option, t.getClass().getName(), + t.getMessage()); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access parse method: " + option, e); + } + } + + return parseArguments(arguments); + } + + public String parseArguments(List arguments) + { + if (arguments.size() > 0) + { + String args = arguments.toString(); + return "unrecognized arguments: " + args.substring(1, args.length() - 1); + } + else + { + return null; + } + } + + public String toString() + { + Class klass = getClass(); + Field[] fields = klass.getFields(); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < fields.length; i++) + { + if (i > 0) + { + str.append("\n"); + } + + String name = fields[i].getName(); + str.append(name); + str.append(" = "); + Object value; + try + { + value = fields[i].get(this); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access field: " + name, e); + } + str.append(value); + } + + return str.toString(); + } + } + + public static final void main(String[] args) throws Exception + { + final Options opts = new Options(); + String error = opts.parse(args); + if (error != null) + { + System.err.println(error); + System.exit(-1); + return; + } + + if (opts.help) + { + System.out.println(opts.usage); + return; + } + + System.out.println(opts); + + switch (opts.mode) + { + case CONSUME: + case BOTH: + new Thread() + { + public void run() + { + try + { + if (opts.jms_consume) + { + jms_consumer(opts); + } + else + { + native_consumer(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }.start(); + break; + } + + switch (opts.mode) + { + case PUBLISH: + case BOTH: + new Thread() + { + public void run() + { + try + { + if (opts.jms_publish) + { + jms_publisher(opts); + } + else + { + native_publisher(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }.start(); + break; + } + } + + private static enum Column + { + LEFT, RIGHT + } + + private static final void sample(Options opts, Column col, String name, long count, + long start, long time, long lastTime) + { + String pfx = ""; + String sfx = ""; + if (opts.mode == BOTH) + { + if (col == Column.RIGHT) + { + pfx = " -- "; + } + else + { + sfx = " --"; + } + } + + if (count == 0) + { + String stats = String.format("%s: %tc", name, start); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + return; + } + + double cumulative = 1000 * (double) count / (double) (time - start); + double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); + + String stats = String.format + ("%s: %d %.2f %.2f", name, count, cumulative, interval); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + } + + private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception + { + String url = String.format + ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", + opts.broker, opts.port); + return new AMQConnection(url); + } + + private static final void jms_publisher(Options opts) throws Exception + { + javax.jms.Connection conn = getJMSConnection(opts); + + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageProducer prod = ssn.createProducer(dest); + MessageConsumer cons = ssn.createConsumer(echo_dest); + prod.setDisableMessageID(!opts.message_id); + prod.setDisableMessageTimestamp(!opts.timestamp); + prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + StringBuilder str = new StringBuilder(); + for (int i = 0; i < opts.size; i++) + { + str.append((char) i); + } + + String body = str.toString(); + + TextMessage cached = ssn.createTextMessage(); + cached.setText(body); + + conn.start(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + Message echo = cons.receive(); + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "JP", count, start, time, lastTime); + lastTime = time; + } + + TextMessage m; + if (opts.message_cache) + { + m = cached; + } + else + { + m = ssn.createTextMessage(); + m.setText(body); + } + + prod.send(m); + count++; + } + + conn.close(); + } + + private static final void jms_consumer(final Options opts) throws Exception + { + final javax.jms.Connection conn = getJMSConnection(opts); + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageConsumer cons = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(echo_dest); + prod.setDisableMessageID(true); + prod.setDisableMessageTimestamp(true); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + final TextMessage echo = ssn.createTextMessage(); + echo.setText("ECHO"); + + final Object done = new Object(); + cons.setMessageListener(new MessageListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void onMessage(Message m) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + try + { + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + prod.send(echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); + lastTime = time; + } + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + conn.start(); + synchronized (done) + { + done.wait(); + } + conn.close(); + } + + private static final org.apache.qpid.transport.Connection getConnection + (Options opts, final SessionDelegate delegate) + { + final Object lock = new Object(); + org.apache.qpid.transport.Connection conn = + IoTransport.connect(opts.broker, opts.port, + new ClientDelegate() + { + public SessionDelegate getSessionDelegate() + { + return delegate; + } + public void exception(Throwable t) + { + t.printStackTrace(); + } + public void closed() {} + @Override public void connectionOpenOk(Channel ch, + ConnectionOpenOk ok) + { + synchronized (lock) + { + lock.notify(); + } + } + }); + conn.send(new ProtocolHeader(1, 0, 10)); + + synchronized (lock) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + return conn; + } + + private static final void native_publisher(Options opts) throws Exception + { + final long[] echos = { 0 }; + org.apache.qpid.transport.Connection conn = getConnection + (opts, + new SessionDelegate() { + @Override public void messageTransfer + (org.apache.qpid.transport.Session ssn, + MessageTransfer mt) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(mt); + } + }); + + Channel ch = conn.getChannel(0); + org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes()); + ssn.attach(ch); + ssn.sessionAttach(ssn.getName()); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + MessageProperties cached_mp = new MessageProperties(); + DeliveryProperties cached_dp = new DeliveryProperties(); + cached_dp.setRoutingKey("test-queue"); + cached_dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + int size = opts.size; + ByteBuffer body = ByteBuffer.allocate(size); + for (int i = 0; i < size; i++) + { + body.put((byte) i); + } + body.flip(); + + ssn.invoke(new MessageSubscribe() + .queue("echo-queue") + .destination("echo-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + synchronized (echos) + { + while (echos[0] < (count/opts.window)) + { + echos.wait(); + } + } + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "NP", count, start, time, lastTime); + lastTime = time; + } + + MessageProperties mp; + DeliveryProperties dp; + if (opts.message_cache) + { + mp = cached_mp; + dp = cached_dp; + } + else + { + mp = new MessageProperties(); + dp = new DeliveryProperties(); + dp.setRoutingKey("test-queue"); + dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + } + + if (opts.message_id) + { + mp.setMessageId(UUID.randomUUID()); + } + + if (opts.timestamp) + { + dp.setTimestamp(System.currentTimeMillis()); + } + + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); + ssn.header(dp, mp); + ssn.data(body.slice()); + ssn.endData(); + count++; + } + + ssn.messageCancel("echo-queue"); + + ssn.sync(); + conn.close(); + } + + private static final void native_consumer(final Options opts) throws Exception + { + final DeliveryProperties dp = new DeliveryProperties(); + final byte[] echo = new byte[0]; + dp.setRoutingKey("echo-queue"); + dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); + final MessageProperties mp = new MessageProperties(); + final Object done = new Object(); + org.apache.qpid.transport.Connection conn = getConnection + (opts, + new SessionDelegate() { + + private long count = 0; + private long lastTime = 0; + private long start; + + @Override public void messageTransfer + (org.apache.qpid.transport.Session ssn, + MessageTransfer mt) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED); + ssn.header(dp, mp); + ssn.data(echo); + ssn.endData(); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(mt); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + Channel ch = conn.getChannel(0); + org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes()); + ssn.attach(ch); + ssn.sessionAttach(ssn.getName()); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + ssn.invoke(new MessageSubscribe() + .queue("test-queue") + .destination("test-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + synchronized (done) + { + done.wait(); + } + + ssn.messageCancel("test-queue"); + + ssn.sync(); + conn.close(); + } + +} -- cgit v1.2.1 From 088509a86a16eb2ce8036992375a51eab77178a0 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 1 Aug 2008 12:31:00 +0000 Subject: improved usage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@681674 13f79535-47bb-0310-9956-ffa450edef68 --- .../tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 79caba67fd..9115a72068 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -59,7 +59,7 @@ public class QpidBench { defval = String.format(" (%s)", def); } - usage.append(String.format("\n %-15s%-15s %s", name, defval, description)); + usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); } public String broker = "localhost"; @@ -106,7 +106,7 @@ public class QpidBench } { - usage("-c, --count", "the number of messages to send and/or receive", count); + usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); } public void parse__count(String c) @@ -120,7 +120,7 @@ public class QpidBench } { - usage("-w, --window", "the number of messages to send before blocking", window); + usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); } public void parse__window(String w) @@ -134,7 +134,7 @@ public class QpidBench } { - usage("--sample", "print stats after this many messages", sample); + usage("--sample", "print stats after this many messages, 0 disables", sample); } public void parse__sample(String s) @@ -220,7 +220,7 @@ public class QpidBench } { - usage("--message-cache", "reuse the same for each send if true", message_cache); + usage("--message-cache", "reuse the same message for each send if true", message_cache); } public void parse__message_cache(String c) -- cgit v1.2.1 From f9fd6fc8c5e408ff210a82557b7a9a8a786ccff4 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 4 Aug 2008 09:52:25 +0000 Subject: QPID-1215 : Replaced use of FileReader with FileInputStream git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@682309 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java index 1cbcfee148..9ead0c19f2 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java @@ -35,7 +35,7 @@ import java.util.List; import java.util.LinkedList; import java.io.IOException; import java.io.File; -import java.io.FileReader; +import java.io.FileInputStream; public class JNDICheck { @@ -81,7 +81,7 @@ public class JNDICheck try { - properties.load(new FileReader(new File(propertyFile))); + properties.load(new FileInputStream(new File(propertyFile))); } catch (IOException e) { -- cgit v1.2.1 From fe1cbdb8a780f78bf6e249e3d41d8de4cce22777 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 5 Aug 2008 19:33:11 +0000 Subject: Profiling driven changes: - made AMQShortString cache the toString() value - added static initializer to IoTransport to disable use of pooled byte buffers - modified IoSender to permit buffering - removed OutputHandler and eliminated intermediate Frame generation between Disassembler and Sender (IoSender) - made Disassembler take advantage of IoSender's buffering - removed Header and Data as distinct protocol events, added Header and Body members to MessageTransfer - modified Assembler and Disassembler to decode/encode Header and Data directly to/from MessageTransfer - modified Disassembler to only write data if encoding of headers is successful - added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding that is also fast for 7-bit ascii - modified JMSTextMessage to use the Strings.toUTF8 - modified QpidBench to only generate 7-bit ascii when using TextMessage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@682887 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/tools/QpidBench.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 9115a72068..377df17277 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -524,7 +524,7 @@ public class QpidBench StringBuilder str = new StringBuilder(); for (int i = 0; i < opts.size; i++) { - str.append((char) i); + str.append((char) (i % 128)); } String body = str.toString(); @@ -782,10 +782,8 @@ public class QpidBench dp.setTimestamp(System.currentTimeMillis()); } - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED); - ssn.header(dp, mp); - ssn.data(body.slice()); - ssn.endData(); + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), body.slice()); count++; } @@ -827,10 +825,9 @@ public class QpidBench { ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED); - ssn.header(dp, mp); - ssn.data(echo); - ssn.endData(); + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); } if (sample) -- cgit v1.2.1 From 85c9a087134f6f56105f21caac1fec4dffdbdbbd Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 6 Aug 2008 17:48:25 +0000 Subject: QPID-1221: added customizable UUID generation and switched the default strategy to use nameUUIDFromBytes rather than randomUUID git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@683337 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 377df17277..181cf427d1 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -32,6 +32,8 @@ import javax.jms.*; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; import static org.apache.qpid.tools.QpidBench.Mode.*; @@ -732,6 +734,8 @@ public class QpidBench ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + UUIDGen gen = UUIDs.newGenerator(); + long count = 0; long lastTime = 0; long start = System.currentTimeMillis(); @@ -774,7 +778,7 @@ public class QpidBench if (opts.message_id) { - mp.setMessageId(UUID.randomUUID()); + mp.setMessageId(gen.generate()); } if (opts.timestamp) -- cgit v1.2.1 From a09ed43cc8ed5862996e684b924f3405e09734c3 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Thu, 9 Oct 2008 17:07:59 +0000 Subject: QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703208 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/QpidBench.java | 186 +++++++++------------ 1 file changed, 79 insertions(+), 107 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 181cf427d1..82e05ba816 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -640,71 +640,47 @@ public class QpidBench } private static final org.apache.qpid.transport.Connection getConnection - (Options opts, final SessionDelegate delegate) + (Options opts) { - final Object lock = new Object(); org.apache.qpid.transport.Connection conn = - IoTransport.connect(opts.broker, opts.port, - new ClientDelegate() - { - public SessionDelegate getSessionDelegate() - { - return delegate; - } - public void exception(Throwable t) - { - t.printStackTrace(); - } - public void closed() {} - @Override public void connectionOpenOk(Channel ch, - ConnectionOpenOk ok) - { - synchronized (lock) - { - lock.notify(); - } - } - }); - conn.send(new ProtocolHeader(1, 0, 10)); - - synchronized (lock) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest"); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); } - return conn; + public void closed(org.apache.qpid.transport.Session ssn) {} + } private static final void native_publisher(Options opts) throws Exception { final long[] echos = { 0 }; - org.apache.qpid.transport.Connection conn = getConnection - (opts, - new SessionDelegate() { - @Override public void messageTransfer - (org.apache.qpid.transport.Session ssn, - MessageTransfer mt) - { - synchronized (echos) - { - echos[0]++; - echos.notify(); - } - ssn.processed(mt); - } - }); - - Channel ch = conn.getChannel(0); - org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); @@ -794,6 +770,7 @@ public class QpidBench ssn.messageCancel("echo-queue"); ssn.sync(); + ssn.close(); conn.close(); } @@ -805,57 +782,51 @@ public class QpidBench dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); final MessageProperties mp = new MessageProperties(); final Object done = new Object(); - org.apache.qpid.transport.Connection conn = getConnection - (opts, - new SessionDelegate() { - - private long count = 0; - private long lastTime = 0; - private long start; - - @Override public void messageTransfer - (org.apache.qpid.transport.Session ssn, - MessageTransfer mt) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - ssn.messageTransfer("amq.direct", - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), - echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); - lastTime = time; - } - ssn.processed(mt); - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - Channel ch = conn.getChannel(0); - org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); @@ -879,6 +850,7 @@ public class QpidBench ssn.messageCancel("test-queue"); ssn.sync(); + ssn.close(); conn.close(); } -- cgit v1.2.1 From f2a61c95c3a13151ed03dd53ef802a1aa6e44c6b Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 23 Oct 2008 19:44:12 +0000 Subject: This is related to QPID-1296. I missed these two files in the previous commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707458 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 82e05ba816..7411e81bd6 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -644,7 +644,7 @@ public class QpidBench { org.apache.qpid.transport.Connection conn = new org.apache.qpid.transport.Connection(); - conn.connect(opts.broker, opts.port, null, "guest", "guest"); + conn.connect(opts.broker, opts.port, null, "guest", "guest",false); return conn; } -- cgit v1.2.1 From fbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 21 Nov 2008 16:32:53 +0000 Subject: This is related to QPID-1479. For starters I have changed the IoSender.java IoReceiver.java and AMQSession.java#Dispatcher to use the Thread factory to create the threads they require. The ThreadFactory has two implimentations, the default being the java.lang.Threads. The other is the RealtimeThreadFactory which uses reflection to create threads with a specific priority. -Dqpid.thread_factory= will decide which thread factory should be loaded. -Dqpid.rt_thread_priority= specifies the gloabl real time thread priority and defaults to 20. You could also set individual thread priorities by adding the nessacery config+code changes. I have also changed the Testkit and QpidBench to use the Thread factory so you could use them for testing/benchmarking work on RT JVMs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719628 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/QpidBench.java | 65 ++++++++++++++++++---- 1 file changed, 54 insertions(+), 11 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 7411e81bd6..4bba7b113d 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -20,23 +20,45 @@ */ package org.apache.qpid.tools; -import java.lang.reflect.InvocationTargetException; +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.UUID; -import javax.jms.*; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import static org.apache.qpid.tools.QpidBench.Mode.*; - /** * QpidBench * @@ -412,7 +434,7 @@ public class QpidBench { case CONSUME: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -432,7 +454,18 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); break; } @@ -440,7 +473,7 @@ public class QpidBench { case PUBLISH: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -460,7 +493,17 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); break; } } -- cgit v1.2.1 From 8417094a3f0c28fef298d57db5616854458b7a8b Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 21 Nov 2008 17:57:16 +0000 Subject: Appologies for the sudden checkin without notice, close to the release cycle. Reverting the changes back. Will attach a patch and commit after the release. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719657 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/QpidBench.java | 65 ++++------------------ 1 file changed, 11 insertions(+), 54 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 4bba7b113d..7411e81bd6 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -20,45 +20,23 @@ */ package org.apache.qpid.tools; -import static org.apache.qpid.tools.QpidBench.Mode.BOTH; -import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; -import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; - -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; +import java.util.UUID; +import javax.jms.*; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.ExchangeBind; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageSubscribe; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.QueueDeclare; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.io.IoTransport; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; +import static org.apache.qpid.tools.QpidBench.Mode.*; + /** * QpidBench * @@ -434,7 +412,7 @@ public class QpidBench { case CONSUME: case BOTH: - Runnable r = new Runnable() + new Thread() { public void run() { @@ -454,18 +432,7 @@ public class QpidBench throw new RuntimeException(e); } } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); + }.start(); break; } @@ -473,7 +440,7 @@ public class QpidBench { case PUBLISH: case BOTH: - Runnable r = new Runnable() + new Thread() { public void run() { @@ -493,17 +460,7 @@ public class QpidBench throw new RuntimeException(e); } } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating publisher thread",e); - } - t.start(); + }.start(); break; } } -- cgit v1.2.1 From fd19e27856105247b669d52a492bfc0ab2beec28 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 13 Jan 2009 18:29:41 +0000 Subject: This is related to QPID-1479 This commit contains themodifications done to the perf test classes to use the thread abstraction patch. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@734212 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/QpidBench.java | 65 ++++++++++++++++++---- 1 file changed, 54 insertions(+), 11 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 7411e81bd6..4bba7b113d 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -20,23 +20,45 @@ */ package org.apache.qpid.tools; -import java.lang.reflect.InvocationTargetException; +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.UUID; -import javax.jms.*; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import static org.apache.qpid.tools.QpidBench.Mode.*; - /** * QpidBench * @@ -412,7 +434,7 @@ public class QpidBench { case CONSUME: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -432,7 +454,18 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); break; } @@ -440,7 +473,7 @@ public class QpidBench { case PUBLISH: case BOTH: - new Thread() + Runnable r = new Runnable() { public void run() { @@ -460,7 +493,17 @@ public class QpidBench throw new RuntimeException(e); } } - }.start(); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); break; } } -- cgit v1.2.1 From db5ee3f13d26890077fc5028d59344d496f99388 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 9 Feb 2009 05:14:09 +0000 Subject: This is related to QPID-1645 Added support to specify the sasl_mechs as a space separated list in the connection URL. By default it will use PLAIN. You could provide a list of mechs to support or force to use one GASSAPI or CRAM-MD5 by specifying only that in the connection URL. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@742267 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java index 9ead0c19f2..2390516ef0 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java @@ -187,7 +187,7 @@ public class JNDICheck print("ConnectionURL:"); print(factory.getConnectionURL().toString()); print("FailoverPolicy"); - print(new FailoverPolicy(factory.getConnectionURL()).toString()); + print(new FailoverPolicy(factory.getConnectionURL(),null).toString()); print(""); } } -- cgit v1.2.1 From a7484073368b0334cd174074bc4576f031a5ebe1 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 25 Feb 2009 23:21:13 +0000 Subject: Made the various receive variants check that the server queue is empty before returning null. Also modified AMQQueueBrowser to use receiveNoWait() when browsing queues using 0-10. These changes uncovered numerous second order bugs, mostly in failover. These are also fixed. This fixes QPID-1642 and QPID-1643. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@747963 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 4bba7b113d..9770adceb0 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -696,6 +696,8 @@ public class QpidBench public void opened(org.apache.qpid.transport.Session ssn) {} + public void resumed(org.apache.qpid.transport.Session ssn) {} + public void exception(org.apache.qpid.transport.Session ssn, SessionException exc) { -- cgit v1.2.1 From b1026352ae2413fd637798df1f49a96b5c4a14ab Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 25 May 2009 02:56:14 +0000 Subject: Added a message to indicate the producer and consumer have completed the test. This can be then greped by any automated to test script to identify the end of a test run. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@778265 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 2 ++ 1 file changed, 2 insertions(+) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 9770adceb0..602fcc6321 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -453,6 +453,7 @@ public class QpidBench { throw new RuntimeException(e); } + System.out.println("Consumer Completed"); } }; @@ -492,6 +493,7 @@ public class QpidBench { throw new RuntimeException(e); } + System.out.println("Producer Completed"); } }; Thread t; -- cgit v1.2.1 From 5410ea0c71cc084aa8215604af4080bb6b534fb2 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 11 Nov 2009 00:17:29 +0000 Subject: Added the LatencyTest and PerfTest kit under the tools modules alongside QpidBench. Modified the testkit build.xml to add tools as build dependency as some of the classes in testkit will be using MessageFactory git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@834722 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/tools/LatencyTest.java | 349 +++++++++++++++++++++ .../java/org/apache/qpid/tools/MessageFactory.java | 64 ++++ .../main/java/org/apache/qpid/tools/PerfBase.java | 102 ++++++ .../java/org/apache/qpid/tools/PerfConsumer.java | 267 ++++++++++++++++ .../java/org/apache/qpid/tools/PerfProducer.java | 262 ++++++++++++++++ .../java/org/apache/qpid/tools/TestParams.java | 168 ++++++++++ 6 files changed, 1212 insertions(+) create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java new file mode 100644 index 0000000000..b88b242e6d --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -0,0 +1,349 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.io.FileOutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.thread.Threading; + +/** + * Latency test sends an x number of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y number of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * It is important to have a sufficiently large number for the warmup count to + * ensure the system is in steady state before the test is started. + * + * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) + * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 + * + * The idea is to get a latency sample for the system once it achieves steady state. + * + */ + +public class LatencyTest extends PerfBase implements MessageListener +{ + MessageProducer producer; + MessageConsumer consumer; + Message msg; + byte[] payload; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + double stdDev = 0; + double avgLatency = 0; + boolean warmup_mode = true; + boolean transacted = false; + int transSize = 0; + + final List latencies; + final Lock lock = new ReentrantLock(); + final Condition warmedUp; + final Condition testCompleted; + + public LatencyTest() + { + super(); + warmedUp = lock.newCondition(); + testCompleted = lock.newCondition(); + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + latencies = new ArrayList (params.getMsgCount()); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(params.isDurable()? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (params.isCacheMessage()) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + ((BytesMessage)msg).writeBytes(payload); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + int count = params.getWarmupCount(); + for (int i=0; i < count; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + try + { + lock.lock(); + warmedUp.await(); + } + finally + { + lock.unlock(); + } + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + if (warmup_mode) + { + warmup_mode = false; + try + { + lock.lock(); + warmedUp.signal(); + } + finally + { + lock.unlock(); + } + } + else + { + computeStats(); + } + } + else if (!warmup_mode) + { + long time = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = time - msg.getJMSTimestamp(); + latencies.add(latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + private void computeStats() + { + avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double sigma = 0; + + for (long latency: latencies) + { + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + sigma = sigma + Math.pow(latency - avgLatency,2); + } + + stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); + + try + { + lock.lock(); + testCompleted.signal(); + } + finally + { + lock.unlock(); + } + } + + public void writeToFile() throws Exception + { + String fileName = System.getProperty("file"); + PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); + for (long latency: latencies) + { + writer.println(String.valueOf(latency)); + } + writer.flush(); + writer.close(); + } + + public void printToConsole() + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Standard Deviation : "). + append(df.format(stdDev)). + append(" ms").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % transSize == 0)) + { + session.commit(); + } + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + if (params.isTransacted()) + { + session.commit(); + } + } + + public void tearDown() throws Exception + { + try + { + lock.lock(); + testCompleted.await(); + } + finally + { + lock.unlock(); + } + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() + { + public void run() + { + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); + } + t.start(); + } +} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java new file mode 100644 index 0000000000..8ab1379fce --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java @@ -0,0 +1,64 @@ +package org.apache.qpid.tools; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class MessageFactory +{ + public static Message createBytesMessage(Session ssn, int size) throws JMSException + { + BytesMessage msg = ssn.createBytesMessage(); + msg.writeBytes(createMessagePayload(size).getBytes()); + return msg; + } + + public static Message createTextMessage(Session ssn, int size) throws JMSException + { + TextMessage msg = ssn.createTextMessage(); + msg.setText(createMessagePayload(size)); + return msg; + } + + public static String createMessagePayload(int size) + { + String msgData = "Qpid Test Message"; + + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count <= (size - msgData.length())) + { + buf.append(msgData); + count += msgData.length(); + } + if (count < size) + { + buf.append(msgData, 0, size - count); + } + + return buf.toString(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java new file mode 100644 index 0000000000..88e75fb6a9 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; + +public class PerfBase +{ + TestParams params; + Connection con; + Session session; + Destination dest; + Destination feedbackDest; + DecimalFormat df = new DecimalFormat("###.##"); + + public PerfBase() + { + params = new TestParams(); + } + + public void setUp() throws Exception + { + Hashtable env = new Hashtable(); + env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory()); + env.put(Context.PROVIDER_URL, params.getProviderURL()); + + Context ctx = null; + try + { + ctx = new InitialContext(env); + } + catch(Exception e) + { + throw new Exception("Error initializing JNDI",e); + + } + + ConnectionFactory conFac = null; + try + { + conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory()); + } + catch(Exception e) + { + throw new Exception("Error looking up connection factory",e); + } + + con = conFac.createConnection(); + con.start(); + session = con.createSession(params.isTransacted(), + params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); + + try + { + dest = (Destination)ctx.lookup( params.isDurable()? + params.getDurableDestination(): + params.getTransientDestination() + ); + } + catch(Exception e) + { + throw new Exception("Error looking up destination",e); + } + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} + diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java new file mode 100644 index 0000000000..0ef0455a64 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -0,0 +1,267 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.thread.Threading; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * hen the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount + * + * Throughput + * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class PerfConsumer extends PerfBase implements MessageListener +{ + MessageConsumer consumer; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput + long startTime = 0; // to measure consumer throughput + long rcvdTime = 0; + boolean transacted = false; + int transSize = 0; + + final Object lock = new Object(); + + public PerfConsumer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + + boolean start = false; + while (!start) + { + Message msg = consumer.receive(); + if (msg instanceof TextMessage) + { + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } + } + } + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + consumer.setMessageListener(this); + } + + public void printResults() throws Exception + { + synchronized (lock) + { + lock.wait(); + } + + double avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Consumer rate : "). + append(df.format(consRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void notifyCompletion(Destination replyTo) throws Exception + { + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) + { + session.commit(); + } + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + notifyCompletion(msg.getJMSReplyTo()); + + synchronized (lock) + { + lock.notifyAll(); + } + } + else + { + rcvdTime = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (rcvdMsgCount == 1) + { + startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); + } + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = rcvdTime - msg.getJMSTimestamp(); + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + printResults(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public static void main(String[] args) + { + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } +} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java new file mode 100644 index 0000000000..015d1e6205 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; + +import org.apache.qpid.thread.Threading; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + */ +public class PerfProducer extends PerfBase +{ + MessageProducer producer; + Message msg; + byte[] payload; + List payloads; + boolean cacheMsg = false; + boolean randomMsgSize = false; + boolean durable = false; + Random random; + int msgSizeRange = 1024; + + public PerfProducer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + feedbackDest = session.createTemporaryQueue(); + + durable = params.isDurable(); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + cacheMsg = true; + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (params.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = params.getMsgSize(); + payloads = new ArrayList(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + } + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + + if (!randomMsgSize) + { + ((BytesMessage)msg).writeBytes(payload); + } + else + { + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + } + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); + + for (int i=0; i < params.getWarmupCount() -1; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + boolean transacted = params.isTransacted(); + int tranSize = params.getTransactionSize(); + + long start = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % tranSize == 0)) + { + session.commit(); + } + } + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; + System.out.println(new StringBuilder("Producer rate: "). + append(df.format(rate)). + append(" msg/sec"). + toString()); + } + + public void waitForCompletion() throws Exception + { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); + } + + public void tearDown() throws Exception + { + producer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + waitForCompletion(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } +} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java new file mode 100644 index 0000000000..f1b682ff32 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Session; + +public class TestParams +{ + private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties"; + + private String connectionFactory = "connectionFactory"; + + private String transientDest = "transientQueue"; + + private String durableDest = "durableQueue"; + + private int msg_size = 1024; + + private int msg_type = 1; // not used yet + + private boolean cacheMessage = false; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private boolean transacted = false; + + private int transaction_size = 1000; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + private boolean random_msg_size = false; + + public TestParams() + { + initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory); + providerURL = System.getProperty("java.naming.provider.url",providerURL); + + transientDest = System.getProperty("transDest",transientDest); + durableDest = System.getProperty("durableDest",durableDest); + + msg_size = Integer.getInteger("msg_size", 1024); + msg_type = Integer.getInteger("msg_type",1); + cacheMessage = Boolean.getBoolean("cache_msg"); + disableMessageID = Boolean.getBoolean("disableMessageID"); + disableTimestamp = Boolean.getBoolean("disableTimestamp"); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + transaction_size = Integer.getInteger("trans_size",1000); + ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg_count",msg_count); + warmup_count = Integer.getInteger("warmup_count",warmup_count); + random_msg_size = Boolean.getBoolean("random_msg_size"); + } + + public int getAckMode() + { + return ack_mode; + } + + public String getConnectionFactory() + { + return connectionFactory; + } + + public String getTransientDestination() + { + return transientDest; + } + + public String getDurableDestination() + { + return durableDest; + } + + public String getInitialContextFactory() + { + return initialContextFactory; + } + + public int getMsgCount() + { + return msg_count; + } + + public int getMsgSize() + { + return msg_size; + } + + public int getMsgType() + { + return msg_type; + } + + public boolean isDurable() + { + return durable; + } + + public String getProviderURL() + { + return providerURL; + } + + public boolean isTransacted() + { + return transacted; + } + + public int getTransactionSize() + { + return transaction_size; + } + + public int getWarmupCount() + { + return warmup_count; + } + + public boolean isCacheMessage() + { + return cacheMessage; + } + + public boolean isDisableMessageID() + { + return disableMessageID; + } + + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + public boolean isRandomMsgSize() + { + return random_msg_size; + } + +} -- cgit v1.2.1 From 8e77acf02345b5c16ba14c3de1b9bce4444e600d Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 8 Nov 2010 21:35:19 +0000 Subject: This is related to rev 1032640 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1032733 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/testkit/Client.java | 154 +++++++++ .../java/org/apache/qpid/testkit/ErrorHandler.java | 27 ++ .../java/org/apache/qpid/testkit/Receiver.java | 216 ++++++++++++ .../main/java/org/apache/qpid/testkit/Sender.java | 197 +++++++++++ .../java/org/apache/qpid/testkit/TestLauncher.java | 370 +++++++++++++++++++++ 5 files changed, 964 insertions(+) create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java new file mode 100644 index 0000000000..b10129d855 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +public abstract class Client implements ExceptionListener +{ + private Connection con; + private Session ssn; + private boolean durable = false; + private boolean transacted = false; + private int txSize = 10; + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + private String contentType = "application/octet-stream"; + + private long reportFrequency = 60000; // every min + + private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + private NumberFormat nf = new DecimalFormat("##.00"); + + private long startTime = System.currentTimeMillis(); + private ErrorHandler errorHandler = null; + + public Client(Connection con) throws Exception + { + this.con = con; + this.con.setExceptionListener(this); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + txSize = Integer.getInteger("tx_size",10); + contentType = System.getProperty("content_type","application/octet-stream"); + reportFrequency = Long.getLong("report_frequency", 60000); + } + + public void close() + { + try + { + con.close(); + } + catch (Exception e) + { + handleError("Error closing connection",e); + } + } + + public void onException(JMSException e) + { + handleError("Connection error",e); + } + + public void setErrorHandler(ErrorHandler h) + { + this.errorHandler = h; + } + + public void handleError(String msg,Exception e) + { + if (errorHandler != null) + { + errorHandler.handleError(msg, e); + } + else + { + System.err.println(msg); + e.printStackTrace(); + } + } + + protected Session getSsn() + { + return ssn; + } + + protected void setSsn(Session ssn) + { + this.ssn = ssn; + } + + protected boolean isDurable() + { + return durable; + } + + protected boolean isTransacted() + { + return transacted; + } + + protected int getTxSize() + { + return txSize; + } + + protected int getAck_mode() + { + return ack_mode; + } + + protected String getContentType() + { + return contentType; + } + + protected long getReportFrequency() + { + return reportFrequency; + } + + protected long getStartTime() + { + return startTime; + } + + protected void setStartTime(long startTime) + { + this.startTime = startTime; + } + + public DateFormat getDf() + { + return df; + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java new file mode 100644 index 0000000000..dbc73c404f --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -0,0 +1,27 @@ +package org.apache.qpid.testkit; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +public interface ErrorHandler { + + public void handleError(String msg,Exception e); +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java new file mode 100644 index 0000000000..b4294ee4cc --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +/** + * A generic receiver which consumes messages + * from a given address in a broker (host/port) + * until told to stop by killing it. + * + * It participates in a feedback loop to ensure the producer + * doesn't fill up the queue. If it receives an "End" msg + * it sends a reply to the replyTo address in that msg. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * sync_rcv - Whether to consume sync (instead of using a listener). + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. * + * check_for_dups - check for duplicate messages and out of order messages. + * jms_durable_sub - create a durable subscription instead of a regular subscription. + */ +public class Receiver extends Client implements MessageListener +{ + long msg_count = 0; + int sequence = 0; + boolean syncRcv = Boolean.getBoolean("sync_rcv"); + boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); + boolean checkForDups = Boolean.getBoolean("check_for_dups"); + MessageConsumer consumer; + List duplicateMessages = new ArrayList(); + + public Receiver(Connection con,String addr) throws Exception + { + super(con); + setSsn(con.createSession(isTransacted(), getAck_mode())); + consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + if (!syncRcv) + { + consumer.setMessageListener(this); + } + + System.out.println("Receiving messages from : " + addr); + } + + public void onMessage(Message msg) + { + handleMessage(msg); + } + + public void run() throws Exception + { + long sleepTime = getReportFrequency(); + while(true) + { + if(syncRcv) + { + long t = sleepTime; + while (t > 0) + { + long start = System.currentTimeMillis(); + Message msg = consumer.receive(t); + t = t - (System.currentTimeMillis() - start); + handleMessage(msg); + } + } + Thread.sleep(sleepTime); + System.out.println(getDf().format(System.currentTimeMillis()) + + " - messages received : " + msg_count); + } + } + + private void handleMessage(Message m) + { + if (m == null) { return; } + + try + { + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) + { + MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); + Message controlMsg = getSsn().createTextMessage(); + temp.send(controlMsg); + if (isTransacted()) + { + getSsn().commit(); + } + temp.close(); + } + else + { + + int seq = m.getIntProperty("sequence"); + if (checkForDups) + { + if (seq == 0) + { + sequence = 0; // wrap around for each iteration + System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); + duplicateMessages.clear(); + } + + if (seq < sequence) + { + duplicateMessages.add(seq); + } + else if (seq == sequence) + { + sequence++; + msg_count ++; + } + else + { + // Multiple publishers are not allowed in this test case. + // So out of order messages are not allowed. + throw new Exception(": Received an out of order message (expected=" + + sequence + ",received=" + seq + ")" ); + } + } + else + { + msg_count ++; + } + + // Please note that this test case doesn't expect duplicates + // When testing for transactions. + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + handleError("Exception receiving messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Receiver rcv = new Receiver(con,addr); + rcv.run(); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java new file mode 100644 index 0000000000..14b9b7302f --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.tools.MessageFactory; + +/** + * A generic sender which sends a stream of messages + * to a given address in a broker (host/port) + * until told to stop by killing it. + * + * It has a feedback loop to ensure it doesn't fill + * up queues due to a slow consumer. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * msg_size (256) + * msg_count (10) - # messages before waiting for feedback + * sleep_time (1000 ms) - sleep time btw each iteration + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. + */ +public class Sender extends Client +{ + protected int msg_size = 256; + protected int msg_count = 10; + protected int iterations = -1; + protected long sleep_time = 1000; + + protected Destination dest = null; + protected Destination replyTo = null; + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + + protected MessageProducer producer; + Random gen = new Random(19770905); + + public Sender(Connection con,String addr) throws Exception + { + super(con); + this.msg_size = Integer.getInteger("msg_size", 100); + this.msg_count = Integer.getInteger("msg_count", 10); + this.iterations = Integer.getInteger("iterations", -1); + this.sleep_time = Long.getLong("sleep_time", 1000); + this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); + this.dest = new AMQAnyDestination(addr); + this.producer = getSsn().createProducer(dest); + this.replyTo = getSsn().createTemporaryQueue(); + + System.out.println("Sending messages to : " + addr); + } + + /* + * If msg_size not specified it generates a message + * between 500-1500 bytes. + */ + protected Message getNextMessage() throws Exception + { + int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; + Message msg = (getContentType().equals("text/plain")) ? + MessageFactory.createTextMessage(getSsn(), s): + MessageFactory.createBytesMessage(getSsn(), s); + + msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT + : DeliveryMode.NON_PERSISTENT); + return msg; + } + + public void run() + { + try + { + boolean infinite = (iterations == -1); + for (int x=0; infinite || x < iterations; x++) + { + long now = System.currentTimeMillis(); + if (now - getStartTime() >= getReportFrequency()) + { + System.out.println(df.format(now) + " - iterations : " + x); + setStartTime(now); + } + + for (int i = 0; i < msg_count; i++) + { + Message msg = getNextMessage(); + msg.setIntProperty("sequence",i); + producer.send(msg); + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + TextMessage m = getSsn().createTextMessage("End"); + m.setJMSReplyTo(replyTo); + producer.send(m); + + if (isTransacted()) + { + getSsn().commit(); + } + + MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); + feedbackConsumer.receive(); + feedbackConsumer.close(); + if (isTransacted()) + { + getSsn().commit(); + } + Thread.sleep(sleep_time); + } + } + catch (Exception e) + { + handleError("Exception sending messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Sender sender = new Sender(con,addr); + sender.run(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java new file mode 100644 index 0000000000..72ca48e1c9 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; + +/** + * A basic test case class that could launch a Sender/Receiver + * or both, each on it's own separate thread. + * + * If con_count == ssn_count, then each entity created will have + * it's own Connection. Else if con_count < ssn_count, then + * a connection will be shared by ssn_count/con_count # of entities. + * + * The if both sender and receiver options are set, it will + * share a connection. + * + * The following options are available as jvm args + * host, port + * con_count,ssn_count + * con_idle_time - which determines heartbeat + * sender, receiver - booleans which indicate which entity to create. + * Setting them both is also a valid option. + */ +public class TestLauncher implements ErrorHandler +{ + protected String host = "127.0.0.1"; + protected int port = 5672; + protected int sessions_per_con = 1; + protected int connection_count = 1; + protected long heartbeat = 5000; + protected boolean sender = false; + protected boolean receiver = false; + protected boolean useUniqueDests = false; + protected String url; + + protected String address = "my_queue; {create: always}"; + protected boolean durable = false; + protected String failover = ""; + protected AMQConnection controlCon; + protected Destination controlDest = null; + protected Session controlSession = null; + protected MessageProducer statusSender; + protected List clients = new ArrayList(); + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + protected String testName; + + public TestLauncher() + { + testName = System.getProperty("test_name","UNKNOWN"); + host = System.getProperty("host", "127.0.0.1"); + port = Integer.getInteger("port", 5672); + sessions_per_con = Integer.getInteger("ssn_per_con", 1); + connection_count = Integer.getInteger("con_count", 1); + heartbeat = Long.getLong("heartbeat", 5); + sender = Boolean.getBoolean("sender"); + receiver = Boolean.getBoolean("receiver"); + useUniqueDests = Boolean.getBoolean("use_unique_dests"); + + failover = System.getProperty("failover", ""); + durable = Boolean.getBoolean("durable"); + + url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; + + if (failover.equalsIgnoreCase("failover_exchange")) + { + url += "&failover='failover_exchange'"; + + System.out.println("Failover exchange " + url ); + } + + configureLogging(); + } + + protected void configureLogging() + { + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); + BasicConfigurator.configure(new ConsoleAppender(layout)); + + String logLevel = System.getProperty("log.level","warn"); + String logComponent = System.getProperty("log.comp","org.apache.qpid"); + + Logger logger = Logger.getLogger(logComponent); + logger.setLevel(Level.toLevel(logLevel, Level.WARN)); + + System.out.println("Level " + logger.getLevel()); + + } + + public void setUpControlChannel() + { + try + { + controlCon = new AMQConnection(url); + controlCon.start(); + + controlDest = new AMQAnyDestination("control; {create: always}"); // durable + + // Create the session to setup the messages + controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + statusSender = controlSession.createProducer(controlDest); + + } + catch (Exception e) + { + handleError("Error while setting up the test",e); + } + } + + public void cleanup() + { + try + { + controlSession.close(); + controlCon.close(); + for (AMQConnection con : clients) + { + con.close(); + } + } + catch (Exception e) + { + handleError("Error while tearing down the test",e); + } + } + + public void start(String addr) + { + try + { + if (addr == null) + { + addr = address; + } + + int ssn_per_con = sessions_per_con; + String addrTemp = addr; + for (int i = 0; i< connection_count; i++) + { + AMQConnection con = new AMQConnection(url); + con.start(); + clients.add(con); + for (int j = 0; j< ssn_per_con; j++) + { + String index = createPrefix(i,j); + if (useUniqueDests) + { + addrTemp = modifySubject(index,addr); + } + + if (sender) + { + createSender(index,con,addrTemp,this); + } + + if (receiver) + { + System.out.println("########## Creating receiver ##################"); + + createReceiver(index,con,addrTemp,this); + } + } + } + } + catch (Exception e) + { + handleError("Exception while setting up the test",e); + } + + } + + protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Receiver rcv = new Receiver(con,addr); + rcv.setErrorHandler(h); + rcv.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Receiver", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Receive thread",e); + } + + t.setName("ReceiverThread-" + index); + t.start(); + } + + protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Sender sender = new Sender(con, addr); + sender.setErrorHandler(h); + sender.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Sender", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Sender thread",e); + } + + t.setName("SenderThread-" + index); + t.start(); + } + + public synchronized void handleError(String msg,Exception e) + { + // In case sending the message fails + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" @ "); + sb.append(df.format(new Date(System.currentTimeMillis()))); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + + try + { + TextMessage errorMsg = controlSession.createTextMessage(); + errorMsg.setStringProperty("status", "error"); + errorMsg.setStringProperty("desc", msg); + errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); + errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); + + System.out.println("Msg " + errorMsg); + + statusSender.send(errorMsg); + } + catch (JMSException e1) + { + e1.printStackTrace(); + } + } + + private String serializeStackTrace(Exception e) + { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(bOut); + e.printStackTrace(printStream); + printStream.close(); + return bOut.toString(); + } + + private String createPrefix(int i, int j) + { + return String.valueOf(i).concat(String.valueOf(j)); + } + + /** + * A basic helper function to modify the subjects by + * appending an index. + */ + private String modifySubject(String index,String addr) + { + if (addr.indexOf("/") > 0) + { + addr = addr.substring(0,addr.indexOf("/")+1) + + index + + addr.substring(addr.indexOf("/")+1,addr.length()); + } + else if (addr.indexOf(";") > 0) + { + addr = addr.substring(0,addr.indexOf(";")) + + "/" + index + + addr.substring(addr.indexOf(";"),addr.length()); + } + else + { + addr = addr + "/" + index; + } + + return addr; + } + + public static void main(String[] args) + { + final TestLauncher test = new TestLauncher(); + test.setUpControlChannel(); + System.out.println("args.length " + args.length); + System.out.println("args [0] " + args [0]); + test.start(args.length > 0 ? args [0] : null); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { test.cleanup(); } + }); + + } +} -- cgit v1.2.1 From 87515049a36ba1cfc24378cb231188a83c9dd5e7 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 14 Feb 2011 15:21:44 +0000 Subject: QPID-3055 As the first step added system properties for host, port and address and got rid of the jndi lookup. There is also a system property for URL which allows a user to specify a fully fledged URL with various connection options like SSL etc.. If the host & port is specified the URL property is ignored. I also added some documentation in the perf_report.sh suggesting recommended settings for performance testing. These are guidelines only and a prospective user needs to experiment with their environment to fine tune the settings. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1070519 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/tools/PerfBase.java | 42 ++++------------ .../java/org/apache/qpid/tools/TestParams.java | 58 +++++++++++----------- 2 files changed, 39 insertions(+), 61 deletions(-) (limited to 'qpid/java/tools/src/main') diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 88e75fb6a9..ac597d17de 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -30,6 +30,9 @@ import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + public class PerfBase { TestParams params; @@ -45,48 +48,21 @@ public class PerfBase } public void setUp() throws Exception - { - Hashtable env = new Hashtable(); - env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory()); - env.put(Context.PROVIDER_URL, params.getProviderURL()); + { - Context ctx = null; - try + if (params.getHost().equals("") || params.getPort() == -1) { - ctx = new InitialContext(env); + con = new AMQConnection(params.getUrl()); } - catch(Exception e) + else { - throw new Exception("Error initializing JNDI",e); - + con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); } - - ConnectionFactory conFac = null; - try - { - conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory()); - } - catch(Exception e) - { - throw new Exception("Error looking up connection factory",e); - } - - con = conFac.createConnection(); con.start(); session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - try - { - dest = (Destination)ctx.lookup( params.isDurable()? - params.getDurableDestination(): - params.getTransientDestination() - ); - } - catch(Exception e) - { - throw new Exception("Error looking up destination",e); - } + dest = new AMQAnyDestination(params.getAddress()); } public void handleError(Exception e,String msg) diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index f1b682ff32..89d6462a39 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -24,15 +24,22 @@ import javax.jms.Session; public class TestParams { - private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties"; - - private String connectionFactory = "connectionFactory"; - - private String transientDest = "transientQueue"; + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; - private String durableDest = "durableQueue"; + private String address = "queue; {create : always}"; private int msg_size = 1024; @@ -60,11 +67,11 @@ public class TestParams public TestParams() { - initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory); - providerURL = System.getProperty("java.naming.provider.url",providerURL); - - transientDest = System.getProperty("transDest",transientDest); - durableDest = System.getProperty("durableDest",durableDest); + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address","queue"); msg_size = Integer.getInteger("msg_size", 1024); msg_type = Integer.getInteger("msg_type",1); @@ -80,29 +87,29 @@ public class TestParams random_msg_size = Boolean.getBoolean("random_msg_size"); } - public int getAckMode() + public String getUrl() { - return ack_mode; + return url; } - public String getConnectionFactory() + public String getHost() { - return connectionFactory; + return host; } - public String getTransientDestination() + public int getPort() { - return transientDest; + return port; } - public String getDurableDestination() + public String getAddress() { - return durableDest; + return address; } - public String getInitialContextFactory() + public int getAckMode() { - return initialContextFactory; + return ack_mode; } public int getMsgCount() @@ -125,11 +132,6 @@ public class TestParams return durable; } - public String getProviderURL() - { - return providerURL; - } - public boolean isTransacted() { return transacted; -- cgit v1.2.1