diff options
Diffstat (limited to 'java')
29 files changed, 1287 insertions, 588 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 737c8d22c4..b3b6a2877f 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -175,3 +175,4 @@ </broker> + diff --git a/java/broker/pom.xml b/java/broker/pom.xml index e2d7e6b9a3..b27ab657b7 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -248,7 +248,7 @@ </condition> <property name="command" - value="python run-tests -v -I java_failing.txt -b localhost:2100"/> + value="python run-tests -v -I java_failing.txt -b localhost:2110"/> <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>--> <ant antfile="python-test.xml" inheritRefs="true"> diff --git a/java/broker/python-test.xml b/java/broker/python-test.xml index e58ae399b9..5c263e3169 100755 --- a/java/broker/python-test.xml +++ b/java/broker/python-test.xml @@ -43,9 +43,9 @@ under the License. > <arg value="${command}"/> <arg value="-p"/> - <arg value="2100"/> + <arg value="2110"/> <arg value="-m"/> - <arg value="2101"/> + <arg value="2111"/> <classpath refid="maven.test.classpath"/> <sysproperty key="QPID_HOME" value="${broker.dir}"/> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9fb3a5040b..4696ec4453 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -226,7 +226,7 @@ public class AMQChannel BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties; //fixme: fudge for QPID-677 properties.getHeaders().keySet(); - + properties.setUserId(protocolSession.getAuthorizedID().getName()); } @@ -378,7 +378,14 @@ public class AMQChannel { _txnContext.rollback(); unsubscribeAllConsumers(session); - requeue(); + try + { + requeue(); + } + catch (AMQException e) + { + _log.error("Caught AMQException whilst attempting to reque:" + e); + } setClosing(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 8ede553464..1a9dc6673a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ public class DefaultExchangeFactory implements ExchangeFactory { _exchangeClassMap.put(type.getName(), type); } - + public Collection<ExchangeType<? extends Exchange>> getRegisteredTypes() { return _exchangeClassMap.values(); @@ -75,6 +75,12 @@ public class DefaultExchangeFactory implements ExchangeFactory public void initialise(Configuration hostConfig) { + + if (hostConfig == null) + { + return; + } + for(Object className : hostConfig.getList("custom-exchanges.class-name")) { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index fa9d83cd7e..543e043bed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -165,7 +165,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter //fixme -- this can be null if (amqProtocolSession != null) { - amqProtocolSession.closeSession(); + try + { + amqProtocolSession.closeSession(); + } + catch (AMQException e) + { + _logger.error("Caught AMQException whilst closingSession:" + e); + } } } @@ -199,7 +206,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } else if (throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22fa0fab23..455983c6d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,15 +20,14 @@ */ package org.apache.qpid.server.registry; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.HashMap; +import java.util.Map; + /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. @@ -59,24 +58,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void run() { _logger.info("Shutting down application registries..."); - try - { - synchronized (ApplicationRegistry.class) - { - Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator(); - - while (keyIterator.hasNext()) - { - IApplicationRegistry instance = keyIterator.next(); - - instance.close(); - } - } - } - catch (Exception e) - { - _logger.error("Error shutting down message store: " + e, e); - } + removeAll(); } } @@ -116,6 +98,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } catch (Exception e) { + _logger.error("Error shutting down message store: " + e, e); } finally @@ -124,6 +107,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + public static void removeAll() + { + Object[] keys = _instanceMap.keySet().toArray(); + for (Object k : keys) + { + remove((Integer) k); + } + } protected ApplicationRegistry(Configuration configuration) { @@ -154,7 +145,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry catch (Exception e) { _logger.error("Error configuring application: " + e, e); - //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); + //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); throw new RuntimeException("Unable to create Application Registry", e); } } @@ -167,7 +158,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void close() throws Exception { - for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) + for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) { virtualHost.close(); } @@ -204,7 +195,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return instance; } - public static void setDefaultApplicationRegistry(String clazz) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 8ccb0be0a8..7a6e0b011f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,27 +20,26 @@ */ package org.apache.qpid.server.store; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -54,6 +53,7 @@ public class MemoryMessageStore implements MessageStore protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); + private AtomicBoolean _closed = new AtomicBoolean(false); public void configure() { @@ -77,6 +77,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { + _closed.getAndSet(true); if (_metaDataMap != null) { _metaDataMap.clear(); @@ -89,8 +90,9 @@ public class MemoryMessageStore implements MessageStore } } - public void removeMessage(StoreContext context, Long messageId) + public void removeMessage(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); if (_log.isDebugEnabled()) { _log.debug("Removing message with id " + messageId); @@ -172,9 +174,10 @@ public class MemoryMessageStore implements MessageStore public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { + checkNotClosed(); List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - if(bodyList == null && lastContentBody) + if (bodyList == null && lastContentBody) { _contentBodyMap.put(messageId, Collections.singletonList(contentBody)); } @@ -193,17 +196,28 @@ public class MemoryMessageStore implements MessageStore public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { + checkNotClosed(); _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); return _metaDataMap.get(messageId); } public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { + checkNotClosed(); List<ContentChunk> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); } + + private void checkNotClosed() throws MessageStoreClosedException + { + if (_closed.get()) + { + throw new MessageStoreClosedException(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java new file mode 100644 index 0000000000..3d1538c7eb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java @@ -0,0 +1,36 @@ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * NOTE: this class currently extends AMQException but + * we should be using AMQExceptions internally in the code base for Protocol errors hence + * the message store interface should throw a different super class which this should be + * moved to reflect + */ +public class MessageStoreClosedException extends AMQException +{ + public MessageStoreClosedException() + { + super("Message store closed"); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java index 02386e84eb..b6badff24d 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java @@ -3,7 +3,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -252,4 +252,3 @@ public class Client implements MessageListener new Client(); } } - diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 38325a1e41..39b3b80e74 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1303,4 +1303,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); } + public boolean isFailingOver() + { + return (_protocolHandler.getFailoverLatch() != null); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 15c113a05d..42f07f97f9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -83,6 +83,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -219,6 +220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + /** + * Contains a list of consumers which have been removed but which might still have + * messages to acknowledge, eg in client ack or transacted modes + */ + private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); + /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); @@ -387,7 +394,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws JMSException { if (isClosed()) { @@ -611,20 +618,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) { -// i.next().acknowledgeLastDelivered(); -// } - - // get next acknowledgement to server - Long next = i.next().getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + i.next().acknowledgeDelivered(); } - if (lastTag != -1) + if (_transacted) { - acknowledgeMessage(lastTag, true); + // Do the above, but for consumers which have been de-registered since the + // last commit + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).acknowledgeDelivered(); + _removedConsumers.remove(i); + } } // Commits outstanding messages sent and outstanding acknowledgements. @@ -760,7 +766,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -776,7 +782,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -1676,6 +1682,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + // Consumers that are closed in a transaction must be stored + // so that messages they have received can be acknowledged on commit + if (_transacted) + { + _removedConsumers.add(consumer); + } } } @@ -2445,6 +2458,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + consumer.failedOver(); registerConsumer(consumer, true); } } @@ -2543,17 +2557,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _messageDeliveryLock; } - /** - * Signifies that the session has pending sends to commit. - */ + /** Signifies that the session has pending sends to commit. */ public void markDirty() { _dirty = true; } - /** - * Signifies that the session has no pending sends to commit. - */ + /** Signifies that the session has no pending sends to commit. */ public void markClean() { _dirty = false; @@ -2562,6 +2572,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Check to see if failover has occured since the last call to markClean(commit or rollback). + * * @return boolean true if failover has occured. */ public boolean hasFailedOver() @@ -2571,6 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Check to see if any message have been sent in this transaction and have not been commited. + * * @return boolean true if a message has been sent but not commited */ public boolean isDirty() @@ -2624,7 +2636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Reject messages on pre-receive queue - consumer.rollback(); + consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); @@ -2668,6 +2680,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).rollback(); + _removedConsumers.remove(i); + } + setConnectionStopped(isStopped); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ae31f5ebdd..610e0109b1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -255,6 +253,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { + case Session.DUPS_OK_ACKNOWLEDGE: + _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); + _receivedDeliveryTags.add(msg.getDeliveryTag()); + break; case Session.CLIENT_ACKNOWLEDGE: _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); @@ -277,8 +279,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.setInRecovery(false); } - private void acquireReceiving() throws JMSException + /** + * @param immediate if true then return immediately if the connection is failing over + * + * @return boolean if the acquisition was successful + * + * @throws JMSException + * @throws InterruptedException + */ + private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { + if (_connection.isFailingOver()) + { + if (immediate) + { + return false; + } + else + { + _connection.blockUntilNotFailingOver(); + } + } + if (!_receiving.compareAndSet(false, true)) { throw new javax.jms.IllegalStateException("Another thread is already receiving."); @@ -290,6 +312,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } _receivingThread = Thread.currentThread(); + return true; } private void releaseReceiving() @@ -343,7 +366,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer checkPreConditions(); - acquireReceiving(); + try + { + acquireReceiving(false); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + if (isClosed()) + { + return null; + } + } _session.startDistpatcherIfNecessary(); @@ -424,7 +458,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - acquireReceiving(); + try + { + if (!acquireReceiving(true)) + { + //If we couldn't acquire the receiving thread then return null. + // This will occur if failing over. + return null; + } + } + catch (InterruptedException e) + { + /* + * This seems slightly shoddy but should never actually be executed + * since we told acquireReceiving to return immediately and it shouldn't + * block on anything. + */ + + return null; + } _session.startDistpatcherIfNecessary(); @@ -721,12 +773,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer break; case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetchHigh) + /*( if (++_outstanding >= _prefetchHigh) { _dups_ok_acknowledge_send = true; } - if (_outstanding <= _prefetchLow) + //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur. + if (_outstanding < _prefetchLow) { _dups_ok_acknowledge_send = false; } @@ -736,11 +789,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), true); + _outstanding = 0; } } break; - + */ case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) @@ -777,20 +831,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeDelivered() { - if (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } @@ -866,11 +911,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() // throws JMSException + public void acknowledge() throws JMSException { - if (!isClosed()) + if (isClosed()) + { + throw new IllegalStateException("Consumer is closed"); + } + else if (_session.hasFailedOver()) + { + throw new JMSException("has failed over"); + } + else { - Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); while (tags.hasNext()) { @@ -878,10 +930,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer tags.remove(); } } - else - { - throw new IllegalStateException("Consumer is closed"); - } } /** Called on recovery to reset the list of delivery tags */ @@ -951,7 +999,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - // rollback pending messages + rollbackPendingMessages(); + } + + public void rollbackPendingMessages() + { if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) @@ -1016,4 +1068,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _synchronousQueue.clear(); } + + /** to be called when a failover has occured */ + public void failedOver() + { + clearReceiveQueue(); + clearUnackedMessages(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 1badbb601c..2b63475d71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -294,18 +294,4 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } } - public boolean equals(Object o) - { - - if (o instanceof BlockingMethodFrameListener) - { - BlockingMethodFrameListener other = (BlockingMethodFrameListener) o; - - return _channelId == other._channelId; - } - - return false; - } - - } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 4a4f4a0a38..c66603b7a0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -41,17 +41,4 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener return _expectedClass.isInstance(frame); } - public boolean equals(Object o) - { - if (o instanceof SpecificMethodFrameListener) - { - SpecificMethodFrameListener other = (SpecificMethodFrameListener) o; - - // here we need to check if the two classes are the same. - return (_channelId == other._channelId) && (_expectedClass.equals(other._expectedClass)); - } - - return false; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 3257caa796..e8a220f5e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -101,27 +100,21 @@ public class TransportConnection _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") ? "Qpid NIO is new default" : "Sysproperty 'qpidnio' is set")); - - result = new MultiThreadSocketConnector(); } else { _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector } - // Don't have the connector's worker thread wait around for other connections (we only use // one SocketConnector per connection at the moment anyway). This allows short-running // clients (like unit tests) to complete quickly. result.setWorkerTimeout(0); - return result; } }); break; - case VM: { _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); @@ -280,8 +273,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); - amqbce.initCause(e); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); throw amqbce; } @@ -294,14 +286,11 @@ public class TransportConnection _acceptor.unbindAll(); synchronized (_inVmPipeAddress) { - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) - { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } - } + _inVmPipeAddress.clear(); + } + _acceptor = null; + _currentInstance = -1; + _currentVMPort = -1; } public static void killVMBroker(int port) diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 25a9e26285..dca6efba67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -22,15 +22,12 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; - import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +46,7 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - final VmPipeConnector ioConnector = new VmPipeConnector(); + final VmPipeConnector ioConnector = new QpidVmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); cfg.setThreadModel(ReadWriteThreadModel.getInstance()); diff --git a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java deleted file mode 100644 index 69684a81ea..0000000000 --- a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.qpid.framing; - -import junit.framework.TestCase; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; - -import org.apache.mina.common.ByteBuffer; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -public class SpecificMethodFrameListenerTest extends TestCase -{ - - SpecificMethodFrameListener close1a = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class); - SpecificMethodFrameListener close1b = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class); - SpecificMethodFrameListener close2 = new SpecificMethodFrameListener(2, ChannelCloseOkBody.class); - SpecificMethodFrameListener open1a = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class); - SpecificMethodFrameListener open1b = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class); - - public void testEquals() - { - //Check that the the same objects are equal - assertEquals("ChannelCloseOKBody a should equal a", close1a, close1a); - assertEquals("ChannelOpenOkBody a should equal a", open1a, open1a); - - //check that the same values in differnt objects are equal - assertEquals("ChannelCloseOKBody b should equal a", close1b, close1a); - assertEquals("ChannelCloseOKBody a should equal b", close1a, close1b); - assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b); - assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b); - - //Chec that different values fail - //Different channels - assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close1a.equals(close2)); - assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close2.equals(close1a)); - - //Different Bodies - assertFalse("ChannelCloseOKBody should not equal ChannelOpenOkBody", close1a.equals(open1a)); - assertFalse("ChannelOpenOkBody should not equal ChannelCloseOKBody", open1a.equals(close1a)); - } - - public void testProcessMethod() throws AMQFrameDecodingException - { - ChannelCloseOkBody ccob = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9).createChannelCloseOkBody(); - ChannelOpenOkBody coob = ((MethodRegistry_0_9)(MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9))).createChannelOpenOkBody(new byte[0]); - - assertTrue("This SpecificMethodFrameListener should process a ChannelCloseOkBody", close1a.processMethod(1, ccob)); - assertFalse("This SpecificMethodFrameListener should NOT process a ChannelOpenOkBody", close1a.processMethod(1, coob)); - - - - - } -} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 224463a446..e45312448c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -418,14 +418,14 @@ public class CommitRollbackTest extends TestCase { _logger.info("Got 2 redelivered, message was prefetched"); _gottwoRedelivered = true; - + } else { - _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); + _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured"); assertFalse("Already received message two", _gottwo); assertFalse("Already received message redelivered two", _gottwoRedelivered); - + _gottwo = true; } } @@ -437,7 +437,7 @@ public class CommitRollbackTest extends TestCase * This test sends two messages receives on of them but doesn't ack it. * The consumer is then closed * the first message should be returned as redelivered. - * the second message should be delivered normally. + * the second message should be delivered normally. * @throws Exception */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception @@ -458,6 +458,7 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); _logger.info("Closing Consumer"); + _consumer.close(); _logger.info("Creating New consumer"); @@ -465,33 +466,20 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); -// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected. -// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet. - result = _consumer.receive(5000); - assertNotNull("test message was consumed and rolled back, but is gone", result); -// The first message back will be either 1 or 2 being redelivered - if (result.getJMSRedelivered()) - { - assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); - } - else // or it will be msg 2 arriving the first time due to latency. - { - _logger.info("Message 2 wasn't prefetched so wasn't rejected"); - assertEquals("2", ((TextMessage) result).getText()); - } + // Message 2 may be marked as redelivered if it was prefetched. + result = _consumer.receive(5000); + assertNotNull("Second message was not consumed, but is gone", result); - Message result2 = _consumer.receive(5000); - assertNotNull("test message was consumed and rolled back, but is gone", result2); + // The first message back will be 2, message 1 has been received but not committed + // Closing the consumer does not commit the session. // if this is message 1 then it should be marked as redelivered - if("1".equals(((TextMessage) result2).getText())) + if("1".equals(((TextMessage) result).getText())) { - assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered()); + fail("First message was recieved again"); } - assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() ); - result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..810d12f472 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a <em>cumulative buffer</em> to help users implement decoders. + * <p> + * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + * <p> + * Here is an example decoder that decodes CRLF terminated lines into + * <code>Command</code> objects: + * <pre> + * public class CRLFTerminatedCommandLineDecoder + * extends CumulativeProtocolDecoder { + * + * private Command parseCommand(ByteBuffer in) { + * // Convert the bytes in the specified buffer to a + * // Command object. + * ... + * } + * + * protected boolean doDecode(IoSession session, ByteBuffer in, + * ProtocolDecoderOutput out) + * throws Exception { + * + * // Remember the initial position. + * int start = in.position(); + * + * // Now find the first CRLF in the buffer. + * byte previous = 0; + * while (in.hasRemaining()) { + * byte current = in.get(); + * + * if (previous == '\r' && current == '\n') { + * // Remember the current position and limit. + * int position = in.position(); + * int limit = in.limit(); + * try { + * in.position(start); + * in.limit(position); + * // The bytes between in.position() and in.limit() + * // now contain a full CRLF terminated line. + * out.write(parseCommand(in.slice())); + * } finally { + * // Set the position to point right after the + * // detected line and set the limit to the old + * // one. + * in.position(position); + * in.limit(limit); + * } + * // Decoded one line; CumulativeProtocolDecoder will + * // call me again until I return false. So just + * // return true until there are no more lines in the + * // buffer. + * return true; + * } + * + * previous = current; + * } + * + * // Could not find CRLF in the buffer. Reset the initial + * // position to the one we recorded above. + * in.position(start); + * + * return false; + * } + * } + * </pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return <tt>true</tt> if and only if there's more to decode in the buffer + * and you want to have <tt>doDecode</tt> method invoked again. + * Return <tt>false</tt> if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode <tt>in</tt>. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java index 202ac1a530..cb24102edd 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -376,8 +376,7 @@ public class MultiThreadSocketConnector extends SocketConnector // Set the ConnectFuture of the specified session, which will be // removed and notified by AbstractIoFilterChain eventually. -// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); - session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture); + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); // Forward the remaining process to the SocketIoProcessor. session.getIoProcessor().addNew(session); diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java index 11c54bb248..03838ca3f1 100644 --- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ private volatile Selector selector, writeSelector; private final Queue newSessions = new Queue(); @@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor this.executor = executor; } - void addNew( SocketSessionImpl session ) throws IOException + void addNew(SocketSessionImpl session) throws IOException { - synchronized( newSessions ) + synchronized (newSessions) { - newSessions.push( session ); + newSessions.push(session); } startupWorker(); @@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor writeSelector.wakeup(); } - void remove( SocketSessionImpl session ) throws IOException + void remove(SocketSessionImpl session) throws IOException { - scheduleRemove( session ); + scheduleRemove(session); startupWorker(); selector.wakeup(); } private void startupWorker() throws IOException { - synchronized(readLock) + synchronized (readLock) { if (readWorker == null) { @@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - synchronized(writeLock) + synchronized (writeLock) { if (writeWorker == null) { @@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } - void flush( SocketSessionImpl session ) + void flush(SocketSessionImpl session) { - scheduleFlush( session ); + scheduleFlush(session); Selector selector = this.writeSelector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - void updateTrafficMask( SocketSessionImpl session ) + void updateTrafficMask(SocketSessionImpl session) { - scheduleTrafficControl( session ); + scheduleTrafficControl(session); Selector selector = this.selector; - if( selector != null ) + if (selector != null) { selector.wakeup(); } } - private void scheduleRemove( SocketSessionImpl session ) + private void scheduleRemove(SocketSessionImpl session) { - synchronized( removingSessions ) + synchronized (removingSessions) { - removingSessions.push( session ); + removingSessions.push(session); } } - private void scheduleFlush( SocketSessionImpl session ) + private void scheduleFlush(SocketSessionImpl session) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { //if flushingSessions grows to contain Integer.MAX_VALUE sessions // then this will fail. @@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } - private void scheduleTrafficControl( SocketSessionImpl session ) + private void scheduleTrafficControl(SocketSessionImpl session) { - synchronized( trafficControllingSessions ) + synchronized (trafficControllingSessions) { - trafficControllingSessions.push( session ); + trafficControllingSessions.push(session); } } private void doAddNewReader() throws InterruptedException { - if( newSessions.isEmpty() ) + if (newSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( newSessions ) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } - if( session == null ) + if (session == null) { break; } @@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { - ch.configureBlocking( false ); - session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); - + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); //System.out.println("ReadDebug:"+"Awaiting Registration"); session.awaitRegistration(); sessionCreated(session); } - catch( IOException e ) + catch (IOException e) { // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } @@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { MultiThreadSocketSessionImpl session; - synchronized(newSessions) + synchronized (newSessions) { session = (MultiThreadSocketSessionImpl) newSessions.peek(); } @@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { ch.configureBlocking(false); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessionsSet.add(session); } @@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } } } - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException { MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized(newSessions) + synchronized (newSessions) { if (!session.created()) { @@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated( session ); + session.getServiceListeners().fireSessionCreated(session); session.doneCreation(); } @@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void doRemove() { - if( removingSessions.isEmpty() ) + if (removingSessions.isEmpty()) { return; } - for( ; ; ) + for (; ;) { MultiThreadSocketSessionImpl session; - synchronized( removingSessions ) + synchronized (removingSessions) { session = (MultiThreadSocketSessionImpl) removingSessions.pop(); } - if( session == null ) + if (session == null) { break; } @@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // (In case that Session.close() is called before addSession() is processed) if (key == null || writeKey == null) { - scheduleRemove( session ); + scheduleRemove(session); break; } // skip if channel is already closed @@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor try { //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized(readLock) + synchronized (readLock) { key.cancel(); } - synchronized(writeLock) + synchronized (writeLock) { writeKey.cancel(); } ch.close(); } - catch( IOException e ) + catch (IOException e) { - session.getFilterChain().fireExceptionCaught( session, e ); + session.getFilterChain().fireExceptionCaught(session, e); } finally { - releaseWriteBuffers( session ); - session.getServiceListeners().fireSessionDestroyed( session ); + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); } } } @@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { Iterator it = selectedKeys.iterator(); - while( it.hasNext() ) + while (it.hasNext()) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = (SelectionKey) it.next(); MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - synchronized(readLock) + synchronized (readLock) { if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) { - read( session ); + read(session); } } @@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = (SelectionKey) it.next(); SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - synchronized(writeLock) + synchronized (writeLock) { if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) { @@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor // Clear OP_WRITE key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { flushingSessions.offer(session); } @@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int totalReadBytes = 0; - for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;) + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) { ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); SocketChannel ch = session.getChannel(); @@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; } finally { @@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor { lastIdleReadCheckTime = currentTime; Set keys = selector.keys(); - if( keys != null ) + if (keys != null) { - for( Iterator it = keys.iterator(); it.hasNext(); ) + for (Iterator it = keys.iterator(); it.hasNext();) { - SelectionKey key = ( SelectionKey ) it.next(); - SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); notifyReadIdleness(session, currentTime); } } @@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyReadIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ), - IdleStatus.BOTH_IDLE, - Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis( IdleStatus.READER_IDLE ), - IdleStatus.READER_IDLE, - Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) ); + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyWriteTimeout(session, currentTime, session .getWriteTimeoutInMillis(), session.getLastWriteTime()); @@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) { notifyIdleness0( - session, currentTime, + session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleness0( session, currentTime, - session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ), - IdleStatus.WRITER_IDLE, - Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) ); + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - notifyWriteTimeout( session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime() ); + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); } - private void notifyIdleness0( SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime ) + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) { - if( idleTime > 0 && lastIoTime != 0 - && ( currentTime - lastIoTime ) >= idleTime ) + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) { - session.increaseIdleCount( status ); - session.getFilterChain().fireSessionIdle( session, status ); + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); } } - private void notifyWriteTimeout( SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime ) + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) { MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; SelectionKey key = sesh.getWriteSelectionKey(); - synchronized(writeLock) - { - if( writeTimeout > 0 - && ( currentTime - lastIoTime ) >= writeTimeout - && key != null && key.isValid() - && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 ) + synchronized (writeLock) { - session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() ); + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } } } - } private SocketSessionImpl getNextFlushingSession() { @@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor private void releaseSession(SocketSessionImpl session) { - synchronized(session.getWriteRequestQueue()) + synchronized (session.getWriteRequestQueue()) { - synchronized(flushingSessionsSet) + synchronized (flushingSessionsSet) { if (session.getScheduledWriteRequests() > 0) { @@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor WriteRequest req; //Should this be synchronized? - synchronized(writeRequestQueue) + synchronized (writeRequestQueue) { while ((req = (WriteRequest) writeRequestQueue.pop()) != null) { @@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) { - if( !session.isConnected() ) + if (!session.isConnected()) { - releaseWriteBuffers( session ); + releaseWriteBuffers(session); releaseSession(session); continue; } @@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor SelectionKey key = session.getWriteSelectionKey(); // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() is processed) - if( key == null ) + if (key == null) { - scheduleFlush( session ); + scheduleFlush(session); releaseSession(session); continue; } // skip if channel is already closed - if( !key.isValid() ) + if (!key.isValid()) { releaseSession(session); continue; @@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor releaseSession(session); } } - catch( IOException e ) + catch (IOException e) { releaseSession(session); - scheduleRemove( session ); - session.getFilterChain().fireExceptionCaught( session, e ); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); } } @@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; // Clear OP_WRITE SelectionKey key = session.getWriteSelectionKey(); - synchronized(writeLock) + synchronized (writeLock) { - key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } SocketChannel ch = session.getChannel(); Queue writeRequestQueue = session.getWriteRequestQueue(); long totalFlushedBytes = 0; - for( ; ; ) + while (true) { WriteRequest req; - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { - req = ( WriteRequest ) writeRequestQueue.first(); + req = (WriteRequest) writeRequestQueue.first(); } - if( req == null ) + if (req == null) { break; } - ByteBuffer buf = ( ByteBuffer ) req.getMessage(); - if( buf.remaining() == 0 ) + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) { - synchronized( writeRequestQueue ) + synchronized (writeRequestQueue) { writeRequestQueue.pop(); } @@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor session.increaseWrittenMessages(); buf.reset(); - session.getFilterChain().fireMessageSent( session, req ); + session.getFilterChain().fireMessageSent(session, req); continue; } @@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor int writtenBytes = 0; // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. -// if (key.isWritable()) + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) { - try - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - catch (IOException ioe) - { - throw ioe; - } + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; } - if( writtenBytes > 0 ) + if (writtenBytes > 0) { - session.increaseWrittenBytes( writtenBytes ); + session.increaseWrittenBytes(writtenBytes); } if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) @@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor if (writeSelector.keys().isEmpty()) { - synchronized(writeLock) + synchronized (writeLock) { if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) @@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); //System.out.println("ReadDebug:"+"Startup"); - for( ; ; ) + for (; ;) { try { @@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doAddNewReader(); doUpdateTrafficMask(); - if( nKeys > 0 ) + if (nKeys > 0) { //System.out.println("ReadDebug:"+nKeys + " keys from selector"); @@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor doRemove(); notifyReadIdleness(); - if( selector.keys().isEmpty() ) + if (selector.keys().isEmpty()) { - synchronized(readLock) + synchronized (readLock) { - if( selector.keys().isEmpty() && newSessions.isEmpty() ) + if (selector.keys().isEmpty() && newSessions.isEmpty()) { readWorker = null; try { selector.close(); } - catch( IOException e ) + catch (IOException e) { - ExceptionMonitor.getInstance().exceptionCaught( e ); + ExceptionMonitor.getInstance().exceptionCaught(e); } finally { @@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor } } } - catch( Throwable t ) + catch (Throwable t) { - ExceptionMonitor.getInstance().exceptionCaught( t ); + ExceptionMonitor.getInstance().exceptionCaught(t); try { - Thread.sleep( 1000 ); + Thread.sleep(1000); } - catch( InterruptedException e1 ) + catch (InterruptedException e1) { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); + ExceptionMonitor.getInstance().exceptionCaught(e1); } } } diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java new file mode 100644 index 0000000000..16e74b17d2 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.BaseIoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.vmpipe.support.VmPipe; +import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; +import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; +import org.apache.mina.util.AnonymousSocketAddress; + +/** + * Connects to {@link IoHandler}s which is bound on the specified + * {@link VmPipeAddress}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class QpidVmPipeConnector extends VmPipeConnector +{ + private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {}; + private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig() + { + public IoSessionConfig getSessionConfig() + { + return CONFIG; + } + }; + + /** + * Creates a new instance. + */ + public QpidVmPipeConnector() + { + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + return connect( address, null, handler, config ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + if( ! ( address instanceof VmPipeAddress ) ) + throw new IllegalArgumentException( + "address must be VmPipeAddress." ); + + if( config == null ) + { + config = getDefaultConfig(); + } + + VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address ); + if( entry == null ) + { + return DefaultConnectFuture.newFailedFuture( + new IOException( "Endpoint unavailable: " + address ) ); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + VmPipeSessionImpl localSession = + new VmPipeSessionImpl( + this, + config, + getListeners(), + new Object(), // lock + new AnonymousSocketAddress(), + handler, + entry ); + + // initialize acceptor session + VmPipeSessionImpl remoteSession = localSession.getRemoteSession(); + try + { + IoFilterChain filterChain = remoteSession.getFilterChain(); + entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + entry.getListeners().fireSessionCreated( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + remoteSession.close(); + } + + + // initialize connector session + try + { + IoFilterChain filterChain = localSession.getFilterChain(); + this.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future ); + getListeners().fireSessionCreated( localSession ); + VmPipeIdleStatusChecker.getInstance().addSession( localSession); + } + catch( Throwable t ) + { + future.setException( t ); + } + + + + return future; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } +}
\ No newline at end of file diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 911d488f94..69deaa383e 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -243,10 +243,10 @@ <!-- Performance Tests. --> <!-- Transient, P2P Tests --> - <TQCT-Qpid-01>-n TQCT-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-01> - <TQCT-Qpid-02>-n TQCT-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-02> - <TQCL-Qpid-01>-n TQCL-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-01> - <TQCL-Qpid-02>-n TQCL-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-02> + <TQCT-Qpid-01>-n TQCT-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-01> + <TQCT-Qpid-02>-n TQCT-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-02> + <TQCL-Qpid-01>-n TQCL-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-01> + <TQCL-Qpid-02>-n TQCL-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-02> <!-- <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TQC-Qpid-05> --> <!-- <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TQC-Qpid-06> --> @@ -269,10 +269,10 @@ <TQM-Qpid-02-1M>-n TQM-Qpid-02-1M -d10M -s[100] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048576 destinationCount=1 rate=0 maxPending=100000000</TQM-Qpid-02-1M> <!-- Transient, Pub/Sub Tests --> - <TTCT-Qpid-01>-n TTCT-Qpid-01 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-01> - <TTCT-Qpid-02>-n TTCT-Qpid-02 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-02> - <TTCL-Qpid-01>-n TTCL-Qpid-01 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-01> - <TTCL-Qpid-02>-n TTCL-Qpid-02 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-02> + <TTCT-Qpid-01>-n TTCT-Qpid-01 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-01> + <TTCT-Qpid-02>-n TTCT-Qpid-02 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-02> + <TTCL-Qpid-01>-n TTCL-Qpid-01 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-01> + <TTCL-Qpid-02>-n TTCL-Qpid-02 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-02> <!-- <TTC-Qpid-05>-n TTC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TTC-Qpid-05> --> <!-- <TTC-Qpid-06>-n TTC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TTC-Qpid-06> --> @@ -295,10 +295,10 @@ <TTM-Qpid-02-1M>-n TTM-Qpid-02-1M -d10M -s[4] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048476 destinationCount=1 rate=0 maxPending=20000000</TTM-Qpid-02-1M> <!-- Persistent, P2P Tests --> - <PQCT-Qpid-01>-n PQCT-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCT-Qpid-01> - <PQCT-Qpid-02>-n PQCT-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCT-Qpid-02> - <PQCL-Qpid-01>-n PQCL-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCL-Qpid-01> - <PQCL-Qpid-02>-n PQCL-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCL-Qpid-02> + <PQCT-Qpid-01>-n PQCT-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCT-Qpid-01> + <PQCT-Qpid-02>-n PQCT-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCT-Qpid-02> + <PQCL-Qpid-01>-n PQCL-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCL-Qpid-01> + <PQCL-Qpid-02>-n PQCL-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCL-Qpid-02> <!-- <PQC-Qpid-05>-n PQC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PQC-Qpid-05> --> <!-- <PQC-Qpid-06>-n PQC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PQC-Qpid-06> --> @@ -321,10 +321,10 @@ <PQM-Qpid-02-1M>-n PQM-Qpid-02-1M -d10M -s[4] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048576 destinationCount=1 rate=0 maxPending=20000000</PQM-Qpid-02-1M> <!-- Persistent, Pub/Sub Tests --> - <PTCT-Qpid-01>-n PTCT-Qpid-01 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-01> - <PTCT-Qpid-02>-n PTCT-Qpid-02 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-02> - <PTCL-Qpid-01>-n PTCL-Qpid-01 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-01> - <PTCL-Qpid-02>-n PTCL-Qpid-02 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-02> + <PTCT-Qpid-01>-n PTCT-Qpid-01 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-01> + <PTCT-Qpid-02>-n PTCT-Qpid-02 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-02> + <PTCL-Qpid-01>-n PTCL-Qpid-01 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-01> + <PTCL-Qpid-02>-n PTCL-Qpid-02 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-02> <!-- <PTC-Qpid-05>-n PTC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PTC-Qpid-05> --> <!-- <PTC-Qpid-06>-n PTC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PTC-Qpid-06> --> @@ -453,169 +453,6 @@ <TTBL-NA-Qpid-06-09>-n TTBL-NA-Qpid-06 -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=50 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1001 messageSize=5120 destinationCount=1 rate=900 maxPending=2000000 </TTBL-NA-Qpid-06-09> <TTBL-NA-Qpid-06-10>-n TTBL-NA-Qpid-06 -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=50 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1001 messageSize=5120 destinationCount=1 rate=1000 maxPending=2000000 </TTBL-NA-Qpid-06-10> - <!-- Benchmarking Tests for P2P. --> - <!-- - Bench mark 1. P2P messaging from 1:1 to 1:32, shared queue, load balancing scenario. - Non-tx, and 1 msg per tx. - Persistent and non-persistent. - No rate limiting. - Message size 256 bytes. - --> - <!-- - <TQBT-Qpid-01-1C> -n TQBT-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-1C> - <TQBT-Qpid-01-2C> -n TQBT-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-2C> - <TQBT-Qpid-01-4C> -n TQBT-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-4C> - <TQBT-Qpid-01-8C> -n TQBT-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-8C> - <TQBT-Qpid-01-16C>-n TQBT-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-16C> - <TQBT-Qpid-01-32C>-n TQBT-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-32C> - - <TQBT-Qpid-02-1C> -n TQBT-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-1C> - <TQBT-Qpid-02-2C> -n TQBT-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-2C> - <TQBT-Qpid-02-4C> -n TQBT-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-4C> - <TQBT-Qpid-02-8C> -n TQBT-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-8C> - <TQBT-Qpid-02-16C>-n TQBT-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-16C> - <TQBT-Qpid-02-32C>-n TQBT-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-32C> - - <PQBT-Qpid-01-1C> -n PQBT-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-1C> - <PQBT-Qpid-01-2C> -n PQBT-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-2C> - <PQBT-Qpid-01-4C> -n PQBT-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-4C> - <PQBT-Qpid-01-8C> -n PQBT-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-8C> - <PQBT-Qpid-01-16C>-n PQBT-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-16C> - <PQBT-Qpid-01-32C>-n PQBT-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-32C> - - <PQBT-Qpid-02-1C> -n PQBT-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-1C> - <PQBT-Qpid-02-2C> -n PQBT-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-2C> - <PQBT-Qpid-02-4C> -n PQBT-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-4C> - <PQBT-Qpid-02-8C> -n PQBT-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-8C> - <PQBT-Qpid-02-16C>-n PQBT-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-16C> - <PQBT-Qpid-02-32C>-n PQBT-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-32C> - - <TQBL-Qpid-01-1C> -n TQBL-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-1C> - <TQBL-Qpid-01-2C> -n TQBL-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-2C> - <TQBL-Qpid-01-4C> -n TQBL-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-4C> - <TQBL-Qpid-01-8C> -n TQBL-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-8C> - <TQBL-Qpid-01-16C>-n TQBL-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-16C> - <TQBL-Qpid-01-32C>-n TQBL-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-32C> - - <TQBL-Qpid-02-1C> -n TQBL-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-1C> - <TQBL-Qpid-02-2C> -n TQBL-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-2C> - <TQBL-Qpid-02-4C> -n TQBL-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-4C> - <TQBL-Qpid-02-8C> -n TQBL-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-8C> - <TQBL-Qpid-02-16C>-n TQBL-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-16C> - <TQBL-Qpid-02-32C>-n TQBL-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-32C> - - <PQBL-Qpid-01-1C> -n PQBL-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-1C> - <PQBL-Qpid-01-2C> -n PQBL-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-2C> - <PQBL-Qpid-01-4C> -n PQBL-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-4C> - <PQBL-Qpid-01-8C> -n PQBL-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-8C> - <PQBL-Qpid-01-16C>-n PQBL-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-16C> - <PQBL-Qpid-01-32C>-n PQBL-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-32C> - - <PQBL-Qpid-02-1C> -n PQBL-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-1C> - <PQBL-Qpid-02-2C> -n PQBL-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-2C> - <PQBL-Qpid-02-4C> -n PQBL-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-4C> - <PQBL-Qpid-02-8C> -n PQBL-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-8C> - <PQBL-Qpid-02-16C>-n PQBL-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-16C> - <PQBL-Qpid-02-32C>-n PQBL-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-32C> - --> - - <!-- - Bench mark 2. P2P messaging 1:1, scaled up from 1 to 32 times. Queues not shared, just one to one throughput. - Non-tx, and 1 msg per tx. - Persistent and non-persistent. - No rate limiting. - Message size from 128 bytes to 1 Meg. - --> - <!-- - <TQBT-Qpid-03-128b>-n TQBT-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-128b> - <TQBT-Qpid-03-256b>-n TQBT-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-256b> - <TQBT-Qpid-03-512b>-n TQBT-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-512b> - <TQBT-Qpid-03-1K>-n TQBT-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-1K> - <TQBT-Qpid-03-5K>-n TQBT-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-5K> - <TQBT-Qpid-03-10K>-n TQBT-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-10K> - <TQBT-Qpid-03-50K>-n TQBT-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-50K> - <TQBT-Qpid-03-100K>-n TQBT-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-100K> - <TQBT-Qpid-03-500K>-n TQBT-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-500K> - <TQBT-Qpid-03-1M>-n TQBT-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-1M> - - <TQBT-Qpid-04-128b>-n TQBT-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-128b> - <TQBT-Qpid-04-256b>-n TQBT-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-256b> - <TQBT-Qpid-04-512b>-n TQBT-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-512b> - <TQBT-Qpid-04-1K>-n TQBT-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-1K> - <TQBT-Qpid-04-5K>-n TQBT-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-5K> - <TQBT-Qpid-04-10K>-n TQBT-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-10K> - <TQBT-Qpid-04-50K>-n TQBT-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-50K> - <TQBT-Qpid-04-100K>-n TQBT-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-100K> - <TQBT-Qpid-04-500K>-n TQBT-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-500K> - <TQBT-Qpid-04-1M>-n TQBT-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-1M> - - <PQBT-Qpid-03-128b>-n PQBT-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-128b> - <PQBT-Qpid-03-256b>-n PQBT-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-256b> - <PQBT-Qpid-03-512b>-n PQBT-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-512b> - <PQBT-Qpid-03-1K>-n PQBT-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-1K> - <PQBT-Qpid-03-5K>-n PQBT-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-5K> - <PQBT-Qpid-03-10K>-n PQBT-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-10K> - <PQBT-Qpid-03-50K>-n PQBT-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-50K> - <PQBT-Qpid-03-100K>-n PQBT-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-100K> - <PQBT-Qpid-03-500K>-n PQBT-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-500K> - <PQBT-Qpid-03-1M>-n PQBT-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-1M> - - <PQBT-Qpid-04-128b>-n PQBT-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-128b> - <PQBT-Qpid-04-256b>-n PQBT-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-256b> - <PQBT-Qpid-04-512b>-n PQBT-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-512b> - <PQBT-Qpid-04-1K>-n PQBT-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-1K> - <PQBT-Qpid-04-5K>-n PQBT-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-5K> - <PQBT-Qpid-04-10K>-n PQBT-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-10K> - <PQBT-Qpid-04-50K>-n PQBT-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-50K> - <PQBT-Qpid-04-100K>-n PQBT-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-100K> - <PQBT-Qpid-04-500K>-n PQBT-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-500K> - <PQBT-Qpid-04-1M>-n PQBT-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-1M> - - <TQBL-Qpid-03-128b>-n TQBL-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-128b> - <TQBL-Qpid-03-256b>-n TQBL-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-256b> - <TQBL-Qpid-03-512b>-n TQBL-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-512b> - <TQBL-Qpid-03-1K>-n TQBL-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-1K> - <TQBL-Qpid-03-5K>-n TQBL-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-5K> - <TQBL-Qpid-03-10K>-n TQBL-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-10K> - <TQBL-Qpid-03-50K>-n TQBL-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-50K> - <TQBL-Qpid-03-100K>-n TQBL-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-100K> - <TQBL-Qpid-03-500K>-n TQBL-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-500K> - <TQBL-Qpid-03-1M>-n TQBL-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-1M> - - <TQBL-Qpid-04-128b>-n TQBL-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-128b> - <TQBL-Qpid-04-256b>-n TQBL-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-256b> - <TQBL-Qpid-04-512b>-n TQBL-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-512b> - <TQBL-Qpid-04-1K>-n TQBL-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-1K> - <TQBL-Qpid-04-5K>-n TQBL-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-5K> - <TQBL-Qpid-04-10K>-n TQBL-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-10K> - <TQBL-Qpid-04-50K>-n TQBL-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-50K> - <TQBL-Qpid-04-100K>-n TQBL-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-100K> - <TQBL-Qpid-04-500K>-n TQBL-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-500K> - <TQBL-Qpid-04-1M>-n TQBL-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-1M> - - <PQBL-Qpid-03-128b>-n PQBL-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-128b> - <PQBL-Qpid-03-256b>-n PQBL-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-256b> - <PQBL-Qpid-03-512b>-n PQBL-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-512b> - <PQBL-Qpid-03-1K>-n PQBL-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-1K> - <PQBL-Qpid-03-5K>-n PQBL-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-5K> - <PQBL-Qpid-03-10K>-n PQBL-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-10K> - <PQBL-Qpid-03-50K>-n PQBL-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-50K> - <PQBL-Qpid-03-100K>-n PQBL-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-100K> - <PQBL-Qpid-03-500K>-n PQBL-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-500K> - <PQBL-Qpid-03-1M>-n PQBL-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-1M> - - <PQBL-Qpid-04-128b>-n PQBL-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-128b> - <PQBL-Qpid-04-256b>-n PQBL-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-256b> - <PQBL-Qpid-04-512b>-n PQBL-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-512b> - <PQBL-Qpid-04-1K>-n PQBL-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-1K> - <PQBL-Qpid-04-5K>-n PQBL-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-5K> - <PQBL-Qpid-04-10K>-n PQBL-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-10K> - <PQBL-Qpid-04-50K>-n PQBL-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-50K> - <PQBL-Qpid-04-100K>-n PQBL-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-100K> - <PQBL-Qpid-04-500K>-n PQBL-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-500K> - <PQBL-Qpid-04-1M>-n PQBL-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-1M> - --> - <!-- Failover Tests. --> <FT-Qpid-01>-n FT-Qpid-01 -s[2500] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true properties=failovertest.properties failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-01> <FT-Qpid-02>-n FT-Qpid-02 -s[2500] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true properties=failovertest.properties failAfterSend=true -o $QPID_WORK/results</FT-Qpid-02> diff --git a/java/pom.xml b/java/pom.xml index c8093965e0..becea34848 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -163,7 +163,6 @@ under the License. <module>management/eclipse-plugin</module> <module>client/example</module> <module>client-java14</module> - </modules> @@ -517,17 +516,17 @@ under the License. <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> - <version>1.0.0</version> + <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-filter-ssl</artifactId> - <version>1.0.0</version> + <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-java5</artifactId> - <version>1.0.0</version> + <version>1.0.1</version> </dependency> <dependency> <groupId>backport-util-concurrent</groupId> diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java index 9629f87d46..624d9c9f3d 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -20,27 +20,16 @@ */ package org.apache.qpid.test; -import junit.extensions.TestSetup; - -import junit.framework.Test; import junit.framework.TestCase; - import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; import javax.naming.Context; import javax.naming.spi.InitialContextFactory; - import java.util.HashMap; import java.util.Hashtable; -import java.util.LinkedList; -import java.util.List; import java.util.Map; public class VMTestCase extends TestCase @@ -89,7 +78,7 @@ public class VMTestCase extends TestCase } env.put("connectionfactory.connection", "amqp://guest:guest@" + _clientID + _virtualhost + "?brokerlist='" - + _brokerlist + "'"); + + _brokerlist + "'"); for (Map.Entry<String, String> c : _connections.entrySet()) { @@ -121,6 +110,12 @@ public class VMTestCase extends TestCase super.tearDown(); } + public int getMessageCount(String queueName) + { + return ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1)) + .getQueueRegistry().getQueue(new AMQShortString(queueName)).getMessageCount(); + } + public void testDummyinVMTestCase() { // keep maven happy diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java new file mode 100644 index 0000000000..037c8285bc --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java @@ -0,0 +1,140 @@ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.NamingException; +import java.util.concurrent.CountDownLatch;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +public class DupsOkTest extends VMTestCase +{ + + private Queue _queue; + private static final int MSG_COUNT = 9999; + private CountDownLatch _awaitCompletion = new CountDownLatch(1); + + public void setUp() throws Exception + { + super.setUp(); + + _queue = (Queue) _context.lookup("queue"); + + //CreateQueue + ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(_queue); + + for (int count = 1; count <= MSG_COUNT; count++) + { + Message msg = producerSession.createTextMessage("Message " + count); + msg.setIntProperty("count", count); + producer.send(msg); + } + + producerConnection.close(); + } + + public void testDupsOK() throws NamingException, JMSException, InterruptedException + { + //Create Client + Connection clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + clientConnection.start(); + + Session clientSession = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + MessageConsumer consumer = clientSession.createConsumer(_queue); + + consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + if (message == null) + { + fail("Should not get null messages"); + } + + if (message instanceof TextMessage) + { + try + { + /*if (message.getIntProperty("count") == 5000) + { + assertEquals("The queue should have 4999 msgs left", 4999, getMessageCount(_queue.getQueueName())); + }*/ + + if (message.getIntProperty("count") == 9999) + { + assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName())); + + //This is the last message so release test. + _awaitCompletion.countDown(); + } + + } + catch (JMSException e) + { + fail("Unable to get int property 'count'"); + } + } + else + { + fail(""); + } + } + }); + + try + { + _awaitCompletion.await(); + } + catch (InterruptedException e) + { + fail("Unable to wait for test completion"); + throw e; + } + +// consumer.close(); + + clientConnection.close(); + + assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName())); + } + + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java new file mode 100644 index 0000000000..fffe073362 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -0,0 +1,222 @@ +package org.apache.qpid.test.client.failover; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.log4j.Logger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; + +public class FailoverTest extends TestCase implements ConnectionListener +{ + private static final Logger _logger = Logger.getLogger(FailoverTest.class); + + private static final int NUM_BROKERS = 2; + private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'"; + private static final String QUEUE = "queue"; + private static final int NUM_MESSAGES = 10; + private Connection con; + private AMQConnectionFactory conFactory; + private Session prodSess; + private AMQQueue q; + private MessageProducer prod; + private Session conSess; + private MessageConsumer consumer; + + private static int usedBrokers = 0; + private CountDownLatch failoverComplete; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + // Create two VM brokers + + for (int i = 0; i < NUM_BROKERS; i++) + { + usedBrokers++; + + TransportConnection.createVMBroker(usedBrokers); + } + //undo last addition + + conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers)); + _logger.info("Connecting on:" + conFactory.getConnectionURL()); + con = conFactory.createConnection(); + ((AMQConnection) con).setConnectionListener(this); + con.start(); + failoverComplete = new CountDownLatch(1); + } + + private void init(boolean transacted, int mode) throws JMSException + { + prodSess = con.createSession(transacted, mode); + q = new AMQQueue("amq.direct", QUEUE); + prod = prodSess.createProducer(q); + conSess = con.createSession(transacted, mode); + consumer = conSess.createConsumer(q); + } + + @Override + protected void tearDown() throws Exception + { + try + { + con.close(); + } + catch (Exception e) + { + + } + + try + { + TransportConnection.killAllVMBrokers(); + ApplicationRegistry.removeAll(); + } + catch (Exception e) + { + fail("Unable to clean up"); + } + super.tearDown(); + } + + private void consumeMessages(int toConsume) throws JMSException + { + Message msg; + for (int i = 0; i < toConsume; i++) + { + msg = consumer.receive(1000); + assertNotNull("Message " + i + " was null!", msg); + assertEquals("message " + i, ((TextMessage) msg).getText()); + } + } + + private void sendMessages(int totalMessages) throws JMSException + { + for (int i = 0; i < totalMessages; i++) + { + prod.send(prodSess.createTextMessage("message " + i)); + } + +// try +// { +// Thread.sleep(100 * totalMessages); +// } +// catch (InterruptedException e) +// { +// //evil ignoring of IE +// } + } + + public void testP2PFailover() throws Exception + { + testP2PFailover(NUM_MESSAGES, true); + } + + public void testP2PFailoverWithMessagesLeft() throws Exception + { + testP2PFailover(NUM_MESSAGES, false); + } + + private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException + { + Message msg = null; + init(false, Session.AUTO_ACKNOWLEDGE); + sendMessages(totalMessages); + + // Consume some messages + int toConsume = totalMessages; + if (!consumeAll) + { + toConsume = totalMessages / 2; + } + + consumeMessages(toConsume); + + _logger.info("Failing over"); + + causeFailure(); + + msg = consumer.receive(500); + //todo: reinstate + assertNull("Should not have received message from new broker!", msg); + // Check that messages still sent / received + sendMessages(totalMessages); + consumeMessages(totalMessages); + } + + private void causeFailure() + { + _logger.info("Failover"); + + TransportConnection.killVMBroker(usedBrokers - 1); + ApplicationRegistry.remove(usedBrokers - 1); + + _logger.info("Awaiting Failover completion"); + try + { + failoverComplete.await(); + } + catch (InterruptedException e) + { + //evil ignore IE. + } + } + + public void testClientAckFailover() throws Exception + { + init(false, Session.CLIENT_ACKNOWLEDGE); + sendMessages(1); + Message msg = consumer.receive(); + assertNotNull("Expected msgs not received", msg); + + + causeFailure(); + + Exception failure = null; + try + { + msg.acknowledge(); + } + catch (Exception e) + { + failure = e; + } + assertNotNull("Exception should be thrown", failure); + } + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + failoverComplete.countDown(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java new file mode 100644 index 0000000000..f83e6e51cb --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.test.unit.ack; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.test.VMTestCase; + +public class AcknowledgeTest extends VMTestCase +{ + private static final int NUM_MESSAGES = 50; + private Connection _con; + private Queue _queue; + private MessageProducer _producer; + private Session _producerSession; + private Session _consumerSession; + private MessageConsumer _consumerA; + private MessageConsumer _consumerB; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _queue = (Queue) _context.lookup("queue"); + + //CreateQueue + ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + _con = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + _con.start(); + } + + private void init(boolean transacted, int mode) throws JMSException { + _producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerSession = _con.createSession(transacted, mode); + _producer = _producerSession.createProducer(_queue); + _consumerA = _consumerSession.createConsumer(_queue); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + try + { + TransportConnection.killAllVMBrokers(); + ApplicationRegistry.removeAll(); + } + catch (Exception e) + { + fail("Unable to clean up"); + } + + } + + private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException + { + Message msg; + for (int i = 0; i < toConsume; i++) + { + msg = consumer.receive(1000); + assertNotNull("Message " + i + " was null!", msg); + assertEquals("message " + i, ((TextMessage) msg).getText()); + } + } + + private void sendMessages(int totalMessages) throws JMSException + { + for (int i = 0; i < totalMessages; i++) + { + _producer.send(_producerSession.createTextMessage("message " + i)); + } + } + + private void testMessageAck(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + sendMessages(NUM_MESSAGES/2); + Thread.sleep(1500); + _consumerB = _consumerSession.createConsumer(_queue); + sendMessages(NUM_MESSAGES/2); + int count = 0; + Message msg = _consumerB.receive(100); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + count++; + msg = _consumerB.receive(1500); + } + if (transacted) + { + _consumerSession.commit(); + } + _consumerA.close(); + _consumerB.close(); + _consumerSession.close(); + assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName())); + } + + public void test2ConsumersAutoAck() throws Exception + { + testMessageAck(false, Session.AUTO_ACKNOWLEDGE); + } + + public void test2ConsumersClientAck() throws Exception + { + testMessageAck(true, Session.CLIENT_ACKNOWLEDGE); + } + + public void test2ConsumersTx() throws Exception + { + testMessageAck(true, Session.AUTO_ACKNOWLEDGE); + } + +} |
