diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-12-19 19:34:45 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-12-19 19:34:45 +0000 |
| commit | 38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch) | |
| tree | 3599403c0c9690898f1e336c009a5564c587c732 /RC5/java/tools/src | |
| parent | a8960649bcd365ef70a5de7812f5910222388a6d (diff) | |
| download | qpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz | |
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/java/tools/src')
| -rw-r--r-- | RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java | 200 | ||||
| -rw-r--r-- | RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 857 |
2 files changed, 1057 insertions, 0 deletions
diff --git a/RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java new file mode 100644 index 0000000000..9ead0c19f2 --- /dev/null +++ b/RC5/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tools; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.jms.FailoverPolicy; + +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.util.Hashtable; +import java.util.Enumeration; +import java.util.List; +import java.util.LinkedList; +import java.io.IOException; +import java.io.File; +import java.io.FileInputStream; + +public class JNDICheck +{ + private static final String QUEUE = "queue."; + private static final String TOPIC = "topic."; + private static final String DESTINATION = "destination."; + private static final String CONNECTION_FACTORY = "connectionfactory."; + + public static void main(String[] args) + { + + if (args.length != 1) + { + usage(); + } + + String propertyFile = args[0]; + + new JNDICheck(propertyFile); + } + + private static void usage() + { + exit("Usage: JNDICheck <JNDI Config file>", 0); + } + + private static void exit(String message, int exitCode) + { + System.err.println(message); + System.exit(exitCode); + } + + private static String JAVA_NAMING = "java.naming.factory.initial"; + + Context _context = null; + Hashtable _environment = null; + + public JNDICheck(String propertyFile) + { + + // Load JNDI properties + Properties properties = new Properties(); + + try + { + properties.load(new FileInputStream(new File(propertyFile))); + } + catch (IOException e) + { + exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1); + } + + //Create the initial context + try + { + + System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING)); + + _context = new InitialContext(properties); + + _environment = _context.getEnvironment(); + + Enumeration keys = _environment.keys(); + + List<String> queues = new LinkedList<String>(); + List<String> topics = new LinkedList<String>(); + List<String> destinations = new LinkedList<String>(); + List<String> connectionFactories = new LinkedList<String>(); + + while (keys.hasMoreElements()) + { + String key = keys.nextElement().toString(); + + if (key.startsWith(QUEUE)) + { + queues.add(key); + } + else if (key.startsWith(TOPIC)) + { + topics.add(key); + } + else if (key.startsWith(DESTINATION)) + { + destinations.add(key); + } + else if (key.startsWith(CONNECTION_FACTORY)) + { + connectionFactories.add(key); + } + } + + printHeader(propertyFile); + printEntries(QUEUE, queues); + printEntries(TOPIC, topics); + printEntries(DESTINATION, destinations); + printEntries(CONNECTION_FACTORY, connectionFactories); + + } + catch (NamingException e) + { + exit("Unable to load JNDI Context due to:" + e.getMessage(), 1); + } + + } + + private void printHeader(String file) + { + print("JNDI file :" + file); + } + + private void printEntries(String type, List<String> list) + { + if (list.size() > 0) + { + String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1); + print(name + " elements in file:"); + printList(list); + print(""); + } + } + + private void printList(List<String> list) + { + for (String item : list) + { + String key = item.substring(item.indexOf('.') + 1); + + try + { + print(key, _context.lookup(key)); + } + catch (NamingException e) + { + exit("Error: item " + key + " no longer in context.", 1); + } + } + } + + private void print(String key, Object object) + { + if (object instanceof AMQDestination) + { + print(key + ":" + object); + } + else if (object instanceof AMQConnectionFactory) + { + AMQConnectionFactory factory = (AMQConnectionFactory) object; + print(key + ":Connection"); + print("ConnectionURL:"); + print(factory.getConnectionURL().toString()); + print("FailoverPolicy"); + print(new FailoverPolicy(factory.getConnectionURL()).toString()); + print(""); + } + } + + private void print(String msg) + { + System.out.println(msg); + } + +} diff --git a/RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java new file mode 100644 index 0000000000..7411e81bd6 --- /dev/null +++ b/RC5/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -0,0 +1,857 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.*; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; + +import static org.apache.qpid.tools.QpidBench.Mode.*; + +/** + * QpidBench + * + */ + +public class QpidBench +{ + + static enum Mode + { + PUBLISH, CONSUME, BOTH + } + + private static class Options + { + private StringBuilder usage = new StringBuilder("qpid-bench <options>"); + + void usage(String name, String description, Object def) + { + String defval = ""; + if (def != null) + { + defval = String.format(" (%s)", def); + } + usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); + } + + public String broker = "localhost"; + public int port = 5672; + public long count = 1000000; + public long window = 100000; + public long sample = window; + public int size = 1024; + public Mode mode = BOTH; + public boolean timestamp = false; + public boolean message_id = false; + public boolean message_cache = false; + public boolean persistent = false; + public boolean jms_publish = false; + public boolean jms_consume = false; + public boolean help = false; + + { + usage("-b, --broker", "the broker hostname", broker); + } + + public void parse__broker(String b) + { + this.broker = b; + } + + public void parse_b(String b) + { + parse__broker(b); + } + + { + usage("-p, --port", "the broker port", port); + } + + public void parse__port(String p) + { + this.port = Integer.parseInt(p); + } + + public void parse_p(String p) + { + parse__port(p); + } + + { + usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); + } + + public void parse__count(String c) + { + this.count = Long.parseLong(c); + } + + public void parse_c(String c) + { + parse__count(c); + } + + { + usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); + } + + public void parse__window(String w) + { + this.window = Long.parseLong(w); + } + + public void parse_w(String w) + { + parse__window(w); + } + + { + usage("--sample", "print stats after this many messages, 0 disables", sample); + } + + public void parse__sample(String s) + { + this.sample = Long.parseLong(s); + } + + { + usage("-i, --interval", "sets both --window and --sample", window); + } + + public void parse__interval(String i) + { + this.window = Long.parseLong(i); + this.sample = window; + } + + public void parse_i(String i) + { + parse__interval(i); + } + + { + usage("-s, --size", "the message size", size); + } + + public void parse__size(String s) + { + this.size = Integer.parseInt(s); + } + + public void parse_s(String s) + { + parse__size(s); + } + + { + usage("-m, --mode", "one of publish, consume, or both", mode); + } + + public void parse__mode(String m) + { + if (m.equalsIgnoreCase("publish")) + { + this.mode = PUBLISH; + } + else if (m.equalsIgnoreCase("consume")) + { + this.mode = CONSUME; + } + else if (m.equalsIgnoreCase("both")) + { + this.mode = BOTH; + } + else + { + throw new IllegalArgumentException + ("must be one of 'publish', 'consume', or 'both'"); + } + } + + public void parse_m(String m) + { + parse__mode(m); + } + + { + usage("--timestamp", "set timestamps on each message if true", timestamp); + } + + public void parse__timestamp(String t) + { + this.timestamp = Boolean.parseBoolean(t); + } + + { + usage("--mesage-id", "set the message-id on each message if true", message_id); + } + + public void parse__message_id(String m) + { + this.message_id = Boolean.parseBoolean(m); + } + + { + usage("--message-cache", "reuse the same message for each send if true", message_cache); + } + + public void parse__message_cache(String c) + { + this.message_cache = Boolean.parseBoolean(c); + } + + { + usage("--persistent", "set the delivery-mode to persistent if true", persistent); + } + + public void parse__persistent(String p) + { + this.persistent = Boolean.parseBoolean(p); + } + + { + usage("--jms-publish", "use the jms client for publish", jms_publish); + } + + public void parse__jms_publish(String jp) + { + this.jms_publish = Boolean.parseBoolean(jp); + } + + { + usage("--jms-consume", "use the jms client for consume", jms_consume); + } + + public void parse__jms_consume(String jc) + { + this.jms_consume = Boolean.parseBoolean(jc); + } + + { + usage("--jms", "sets both --jms-publish and --jms-consume", false); + } + + public void parse__jms(String j) + { + this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); + } + + { + usage("-h, --help", "prints this message", null); + } + + public void parse__help() + { + this.help = true; + } + + public void parse_h() + { + parse__help(); + } + + public String parse(String ... args) + { + Class klass = getClass(); + List<String> arguments = new ArrayList<String>(); + for (int i = 0; i < args.length; i++) + { + String option = args[i]; + + if (!option.startsWith("-")) + { + arguments.add(option); + continue; + } + + String method = "parse" + option.replace('-', '_'); + try + { + try + { + Method parser = klass.getMethod(method); + parser.invoke(this); + } + catch (NoSuchMethodException e) + { + try + { + Method parser = klass.getMethod(method, String.class); + + String value = null; + if (i + 1 < args.length) + { + value = args[i+1]; + i++; + } + else + { + return option + " requires a value"; + } + + parser.invoke(this, value); + } + catch (NoSuchMethodException e2) + { + return "no such option: " + option; + } + } + } + catch (InvocationTargetException e) + { + Throwable t = e.getCause(); + return String.format + ("error parsing %s: %s: %s", option, t.getClass().getName(), + t.getMessage()); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access parse method: " + option, e); + } + } + + return parseArguments(arguments); + } + + public String parseArguments(List<String> arguments) + { + if (arguments.size() > 0) + { + String args = arguments.toString(); + return "unrecognized arguments: " + args.substring(1, args.length() - 1); + } + else + { + return null; + } + } + + public String toString() + { + Class klass = getClass(); + Field[] fields = klass.getFields(); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < fields.length; i++) + { + if (i > 0) + { + str.append("\n"); + } + + String name = fields[i].getName(); + str.append(name); + str.append(" = "); + Object value; + try + { + value = fields[i].get(this); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access field: " + name, e); + } + str.append(value); + } + + return str.toString(); + } + } + + public static final void main(String[] args) throws Exception + { + final Options opts = new Options(); + String error = opts.parse(args); + if (error != null) + { + System.err.println(error); + System.exit(-1); + return; + } + + if (opts.help) + { + System.out.println(opts.usage); + return; + } + + System.out.println(opts); + + switch (opts.mode) + { + case CONSUME: + case BOTH: + new Thread() + { + public void run() + { + try + { + if (opts.jms_consume) + { + jms_consumer(opts); + } + else + { + native_consumer(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }.start(); + break; + } + + switch (opts.mode) + { + case PUBLISH: + case BOTH: + new Thread() + { + public void run() + { + try + { + if (opts.jms_publish) + { + jms_publisher(opts); + } + else + { + native_publisher(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }.start(); + break; + } + } + + private static enum Column + { + LEFT, RIGHT + } + + private static final void sample(Options opts, Column col, String name, long count, + long start, long time, long lastTime) + { + String pfx = ""; + String sfx = ""; + if (opts.mode == BOTH) + { + if (col == Column.RIGHT) + { + pfx = " -- "; + } + else + { + sfx = " --"; + } + } + + if (count == 0) + { + String stats = String.format("%s: %tc", name, start); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + return; + } + + double cumulative = 1000 * (double) count / (double) (time - start); + double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); + + String stats = String.format + ("%s: %d %.2f %.2f", name, count, cumulative, interval); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + } + + private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception + { + String url = String.format + ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", + opts.broker, opts.port); + return new AMQConnection(url); + } + + private static final void jms_publisher(Options opts) throws Exception + { + javax.jms.Connection conn = getJMSConnection(opts); + + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageProducer prod = ssn.createProducer(dest); + MessageConsumer cons = ssn.createConsumer(echo_dest); + prod.setDisableMessageID(!opts.message_id); + prod.setDisableMessageTimestamp(!opts.timestamp); + prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + StringBuilder str = new StringBuilder(); + for (int i = 0; i < opts.size; i++) + { + str.append((char) (i % 128)); + } + + String body = str.toString(); + + TextMessage cached = ssn.createTextMessage(); + cached.setText(body); + + conn.start(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + Message echo = cons.receive(); + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "JP", count, start, time, lastTime); + lastTime = time; + } + + TextMessage m; + if (opts.message_cache) + { + m = cached; + } + else + { + m = ssn.createTextMessage(); + m.setText(body); + } + + prod.send(m); + count++; + } + + conn.close(); + } + + private static final void jms_consumer(final Options opts) throws Exception + { + final javax.jms.Connection conn = getJMSConnection(opts); + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageConsumer cons = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(echo_dest); + prod.setDisableMessageID(true); + prod.setDisableMessageTimestamp(true); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + final TextMessage echo = ssn.createTextMessage(); + echo.setText("ECHO"); + + final Object done = new Object(); + cons.setMessageListener(new MessageListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void onMessage(Message m) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + try + { + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + prod.send(echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); + lastTime = time; + } + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + conn.start(); + synchronized (done) + { + done.wait(); + } + conn.close(); + } + + private static final org.apache.qpid.transport.Connection getConnection + (Options opts) + { + org.apache.qpid.transport.Connection conn = + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest",false); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(org.apache.qpid.transport.Session ssn) {} + + } + + private static final void native_publisher(Options opts) throws Exception + { + final long[] echos = { 0 }; + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + MessageProperties cached_mp = new MessageProperties(); + DeliveryProperties cached_dp = new DeliveryProperties(); + cached_dp.setRoutingKey("test-queue"); + cached_dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + int size = opts.size; + ByteBuffer body = ByteBuffer.allocate(size); + for (int i = 0; i < size; i++) + { + body.put((byte) i); + } + body.flip(); + + ssn.invoke(new MessageSubscribe() + .queue("echo-queue") + .destination("echo-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + UUIDGen gen = UUIDs.newGenerator(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + synchronized (echos) + { + while (echos[0] < (count/opts.window)) + { + echos.wait(); + } + } + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "NP", count, start, time, lastTime); + lastTime = time; + } + + MessageProperties mp; + DeliveryProperties dp; + if (opts.message_cache) + { + mp = cached_mp; + dp = cached_dp; + } + else + { + mp = new MessageProperties(); + dp = new DeliveryProperties(); + dp.setRoutingKey("test-queue"); + dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + } + + if (opts.message_id) + { + mp.setMessageId(gen.generate()); + } + + if (opts.timestamp) + { + dp.setTimestamp(System.currentTimeMillis()); + } + + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), body.slice()); + count++; + } + + ssn.messageCancel("echo-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + + private static final void native_consumer(final Options opts) throws Exception + { + final DeliveryProperties dp = new DeliveryProperties(); + final byte[] echo = new byte[0]; + dp.setRoutingKey("echo-queue"); + dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); + final MessageProperties mp = new MessageProperties(); + final Object done = new Object(); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + ssn.invoke(new MessageSubscribe() + .queue("test-queue") + .destination("test-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + synchronized (done) + { + done.wait(); + } + + ssn.messageCancel("test-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + +} |
