summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/etc/config.xml1
-rw-r--r--java/broker/pom.xml2
-rwxr-xr-xjava/broker/python-test.xml4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java36
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java61
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java115
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java73
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java36
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java197
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java3
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java276
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java151
-rw-r--r--java/perftests/pom.xml195
-rw-r--r--java/pom.xml7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java21
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java140
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java222
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java151
29 files changed, 1287 insertions, 588 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 737c8d22c4..b3b6a2877f 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -175,3 +175,4 @@
</broker>
+
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index e2d7e6b9a3..b27ab657b7 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -248,7 +248,7 @@
</condition>
<property name="command"
- value="python run-tests -v -I java_failing.txt -b localhost:2100"/>
+ value="python run-tests -v -I java_failing.txt -b localhost:2110"/>
<!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>-->
<ant antfile="python-test.xml" inheritRefs="true">
diff --git a/java/broker/python-test.xml b/java/broker/python-test.xml
index e58ae399b9..5c263e3169 100755
--- a/java/broker/python-test.xml
+++ b/java/broker/python-test.xml
@@ -43,9 +43,9 @@ under the License.
>
<arg value="${command}"/>
<arg value="-p"/>
- <arg value="2100"/>
+ <arg value="2110"/>
<arg value="-m"/>
- <arg value="2101"/>
+ <arg value="2111"/>
<classpath refid="maven.test.classpath"/>
<sysproperty key="QPID_HOME" value="${broker.dir}"/>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 9fb3a5040b..4696ec4453 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -226,7 +226,7 @@ public class AMQChannel
BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
//fixme: fudge for QPID-677
properties.getHeaders().keySet();
-
+
properties.setUserId(protocolSession.getAuthorizedID().getName());
}
@@ -378,7 +378,14 @@ public class AMQChannel
{
_txnContext.rollback();
unsubscribeAllConsumers(session);
- requeue();
+ try
+ {
+ requeue();
+ }
+ catch (AMQException e)
+ {
+ _log.error("Caught AMQException whilst attempting to reque:" + e);
+ }
setClosing(true);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 8ede553464..1a9dc6673a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -53,7 +53,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
{
_exchangeClassMap.put(type.getName(), type);
}
-
+
public Collection<ExchangeType<? extends Exchange>> getRegisteredTypes()
{
return _exchangeClassMap.values();
@@ -75,6 +75,12 @@ public class DefaultExchangeFactory implements ExchangeFactory
public void initialise(Configuration hostConfig)
{
+
+ if (hostConfig == null)
+ {
+ return;
+ }
+
for(Object className : hostConfig.getList("custom-exchanges.class-name"))
{
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index fa9d83cd7e..543e043bed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -165,7 +165,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
//fixme -- this can be null
if (amqProtocolSession != null)
{
- amqProtocolSession.closeSession();
+ try
+ {
+ amqProtocolSession.closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Caught AMQException whilst closingSession:" + e);
+ }
}
}
@@ -199,7 +206,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
else if (throwable instanceof IOException)
{
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 22fa0fab23..455983c6d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.server.registry;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* An abstract application registry that provides access to configuration information and handles the
* construction and caching of configurable objects.
@@ -59,24 +58,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public void run()
{
_logger.info("Shutting down application registries...");
- try
- {
- synchronized (ApplicationRegistry.class)
- {
- Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
-
- while (keyIterator.hasNext())
- {
- IApplicationRegistry instance = keyIterator.next();
-
- instance.close();
- }
- }
- }
- catch (Exception e)
- {
- _logger.error("Error shutting down message store: " + e, e);
- }
+ removeAll();
}
}
@@ -116,6 +98,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
catch (Exception e)
{
+ _logger.error("Error shutting down message store: " + e, e);
}
finally
@@ -124,6 +107,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ public static void removeAll()
+ {
+ Object[] keys = _instanceMap.keySet().toArray();
+ for (Object k : keys)
+ {
+ remove((Integer) k);
+ }
+ }
protected ApplicationRegistry(Configuration configuration)
{
@@ -154,7 +145,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
catch (Exception e)
{
_logger.error("Error configuring application: " + e, e);
- //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
+ //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
throw new RuntimeException("Unable to create Application Registry", e);
}
}
@@ -167,7 +158,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public void close() throws Exception
{
- for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+ for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
{
virtualHost.close();
}
@@ -204,7 +195,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return instance;
}
-
public static void setDefaultApplicationRegistry(String clazz)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 8ccb0be0a8..7a6e0b011f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,27 +20,26 @@
*/
package org.apache.qpid.server.store;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-/**
- * A simple message store that stores the messages in a threadsafe structure in memory.
- */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -54,6 +53,7 @@ public class MemoryMessageStore implements MessageStore
protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
public void configure()
{
@@ -77,6 +77,7 @@ public class MemoryMessageStore implements MessageStore
public void close() throws Exception
{
+ _closed.getAndSet(true);
if (_metaDataMap != null)
{
_metaDataMap.clear();
@@ -89,8 +90,9 @@ public class MemoryMessageStore implements MessageStore
}
}
- public void removeMessage(StoreContext context, Long messageId)
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
+ checkNotClosed();
if (_log.isDebugEnabled())
{
_log.debug("Removing message with id " + messageId);
@@ -172,9 +174,10 @@ public class MemoryMessageStore implements MessageStore
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
throws AMQException
{
+ checkNotClosed();
List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
- if(bodyList == null && lastContentBody)
+ if (bodyList == null && lastContentBody)
{
_contentBodyMap.put(messageId, Collections.singletonList(contentBody));
}
@@ -193,17 +196,28 @@ public class MemoryMessageStore implements MessageStore
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
throws AMQException
{
+ checkNotClosed();
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
+ checkNotClosed();
return _metaDataMap.get(messageId);
}
public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
+ checkNotClosed();
List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
}
+
+ private void checkNotClosed() throws MessageStoreClosedException
+ {
+ if (_closed.get())
+ {
+ throw new MessageStoreClosedException();
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
new file mode 100644
index 0000000000..3d1538c7eb
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol errors hence
+ * the message store interface should throw a different super class which this should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+ public MessageStoreClosedException()
+ {
+ super("Message store closed");
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
index 02386e84eb..b6badff24d 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
@@ -3,7 +3,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -252,4 +252,3 @@ public class Client implements MessageListener
new Client();
}
}
-
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 38325a1e41..39b3b80e74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1303,4 +1303,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
}
+ public boolean isFailingOver()
+ {
+ return (_protocolHandler.getFailoverLatch() != null);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 15c113a05d..42f07f97f9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -83,6 +83,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -219,6 +220,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private Map<AMQShortString, BasicMessageConsumer> _consumers =
new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ /**
+ * Contains a list of consumers which have been removed but which might still have
+ * messages to acknowledge, eg in client ack or transacted modes
+ */
+ private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -387,7 +394,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @throws IllegalStateException If the session is closed.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws JMSException
{
if (isClosed())
{
@@ -611,20 +618,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
-// i.next().acknowledgeLastDelivered();
-// }
-
- // get next acknowledgement to server
- Long next = i.next().getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ i.next().acknowledgeDelivered();
}
- if (lastTag != -1)
+ if (_transacted)
{
- acknowledgeMessage(lastTag, true);
+ // Do the above, but for consumers which have been de-registered since the
+ // last commit
+ for (int i = 0; i < _removedConsumers.size(); i++)
+ {
+ // Sends acknowledgement to server
+ _removedConsumers.get(i).acknowledgeDelivered();
+ _removedConsumers.remove(i);
+ }
}
// Commits outstanding messages sent and outstanding acknowledgements.
@@ -760,7 +766,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -776,7 +782,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1676,6 +1682,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_destinationConsumerCount.remove(dest);
}
}
+
+ // Consumers that are closed in a transaction must be stored
+ // so that messages they have received can be acknowledged on commit
+ if (_transacted)
+ {
+ _removedConsumers.add(consumer);
+ }
}
}
@@ -2445,6 +2458,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator it = consumers.iterator(); it.hasNext();)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ consumer.failedOver();
registerConsumer(consumer, true);
}
}
@@ -2543,17 +2557,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _messageDeliveryLock;
}
- /**
- * Signifies that the session has pending sends to commit.
- */
+ /** Signifies that the session has pending sends to commit. */
public void markDirty()
{
_dirty = true;
}
- /**
- * Signifies that the session has no pending sends to commit.
- */
+ /** Signifies that the session has no pending sends to commit. */
public void markClean()
{
_dirty = false;
@@ -2562,6 +2572,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Check to see if failover has occured since the last call to markClean(commit or rollback).
+ *
* @return boolean true if failover has occured.
*/
public boolean hasFailedOver()
@@ -2571,6 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Check to see if any message have been sent in this transaction and have not been commited.
+ *
* @return boolean true if a message has been sent but not commited
*/
public boolean isDirty()
@@ -2624,7 +2636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// Reject messages on pre-receive queue
- consumer.rollback();
+ consumer.rollbackPendingMessages();
// Reject messages on pre-dispatch queue
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
@@ -2668,6 +2680,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
+ for (int i = 0; i < _removedConsumers.size(); i++)
+ {
+ // Sends acknowledgement to server
+ _removedConsumers.get(i).rollback();
+ _removedConsumers.remove(i);
+ }
+
setConnectionStopped(isStopped);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ae31f5ebdd..610e0109b1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -255,6 +253,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag());
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ break;
case Session.CLIENT_ACKNOWLEDGE:
_unacknowledgedDeliveryTags.add(msg.getDeliveryTag());
@@ -277,8 +279,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session.setInRecovery(false);
}
- private void acquireReceiving() throws JMSException
+ /**
+ * @param immediate if true then return immediately if the connection is failing over
+ *
+ * @return boolean if the acquisition was successful
+ *
+ * @throws JMSException
+ * @throws InterruptedException
+ */
+ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
+ if (_connection.isFailingOver())
+ {
+ if (immediate)
+ {
+ return false;
+ }
+ else
+ {
+ _connection.blockUntilNotFailingOver();
+ }
+ }
+
if (!_receiving.compareAndSet(false, true))
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
@@ -290,6 +312,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
_receivingThread = Thread.currentThread();
+ return true;
}
private void releaseReceiving()
@@ -343,7 +366,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ acquireReceiving(false);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
_session.startDistpatcherIfNecessary();
@@ -424,7 +458,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ if (!acquireReceiving(true))
+ {
+ //If we couldn't acquire the receiving thread then return null.
+ // This will occur if failing over.
+ return null;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ /*
+ * This seems slightly shoddy but should never actually be executed
+ * since we told acquireReceiving to return immediately and it shouldn't
+ * block on anything.
+ */
+
+ return null;
+ }
_session.startDistpatcherIfNecessary();
@@ -721,12 +773,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
break;
case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
+ /*( if (++_outstanding >= _prefetchHigh)
{
_dups_ok_acknowledge_send = true;
}
- if (_outstanding <= _prefetchLow)
+ //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur.
+ if (_outstanding < _prefetchLow)
{
_dups_ok_acknowledge_send = false;
}
@@ -736,11 +789,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (!_session.isInRecovery())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ _outstanding = 0;
}
}
break;
-
+ */
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
@@ -777,20 +831,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/** Acknowledge up to last message delivered (if any). Used when commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
}
}
@@ -866,11 +911,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- public void acknowledge() // throws JMSException
+ public void acknowledge() throws JMSException
{
- if (!isClosed())
+ if (isClosed())
+ {
+ throw new IllegalStateException("Consumer is closed");
+ }
+ else if (_session.hasFailedOver())
+ {
+ throw new JMSException("has failed over");
+ }
+ else
{
-
Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
while (tags.hasNext())
{
@@ -878,10 +930,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
tags.remove();
}
}
- else
- {
- throw new IllegalStateException("Consumer is closed");
- }
}
/** Called on recovery to reset the list of delivery tags */
@@ -951,7 +999,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- // rollback pending messages
+ rollbackPendingMessages();
+ }
+
+ public void rollbackPendingMessages()
+ {
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
@@ -1016,4 +1068,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_synchronousQueue.clear();
}
+
+ /** to be called when a failover has occured */
+ public void failedOver()
+ {
+ clearReceiveQueue();
+ clearUnackedMessages();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 1badbb601c..2b63475d71 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -294,18 +294,4 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
}
- public boolean equals(Object o)
- {
-
- if (o instanceof BlockingMethodFrameListener)
- {
- BlockingMethodFrameListener other = (BlockingMethodFrameListener) o;
-
- return _channelId == other._channelId;
- }
-
- return false;
- }
-
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index 4a4f4a0a38..c66603b7a0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -41,17 +41,4 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
return _expectedClass.isInstance(frame);
}
- public boolean equals(Object o)
- {
- if (o instanceof SpecificMethodFrameListener)
- {
- SpecificMethodFrameListener other = (SpecificMethodFrameListener) o;
-
- // here we need to check if the two classes are the same.
- return (_channelId == other._channelId) && (_expectedClass.equals(other._expectedClass));
- }
-
- return false;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 3257caa796..e8a220f5e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -101,27 +100,21 @@ public class TransportConnection
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
? "Qpid NIO is new default"
: "Sysproperty 'qpidnio' is set"));
-
-
result = new MultiThreadSocketConnector();
}
else
{
_logger.info("Using Mina NIO");
-
result = new SocketConnector(); // non-blocking connector
}
-
// Don't have the connector's worker thread wait around for other connections (we only use
// one SocketConnector per connection at the moment anyway). This allows short-running
// clients (like unit tests) to complete quickly.
result.setWorkerTimeout(0);
-
return result;
}
});
break;
-
case VM:
{
_instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
@@ -280,8 +273,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
- amqbce.initCause(e);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
throw amqbce;
}
@@ -294,14 +286,11 @@ public class TransportConnection
_acceptor.unbindAll();
synchronized (_inVmPipeAddress)
{
- Iterator keys = _inVmPipeAddress.keySet().iterator();
-
- while (keys.hasNext())
- {
- int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
- }
- }
+ _inVmPipeAddress.clear();
+ }
+ _acceptor = null;
+ _currentInstance = -1;
+ _currentVMPort = -1;
}
public static void killVMBroker(int port)
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 25a9e26285..dca6efba67 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -22,15 +22,12 @@ package org.apache.qpid.client.transport;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
-
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +46,7 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
- final VmPipeConnector ioConnector = new VmPipeConnector();
+ final VmPipeConnector ioConnector = new QpidVmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
cfg.setThreadModel(ReadWriteThreadModel.getInstance());
diff --git a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
deleted file mode 100644
index 69684a81ea..0000000000
--- a/java/client/src/test/java/org/apache/qpid/client/SpecificMethodFrameListenerTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.qpid.framing;
-
-import junit.framework.TestCase;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-
-import org.apache.mina.common.ByteBuffer;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-public class SpecificMethodFrameListenerTest extends TestCase
-{
-
- SpecificMethodFrameListener close1a = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class);
- SpecificMethodFrameListener close1b = new SpecificMethodFrameListener(1, ChannelCloseOkBody.class);
- SpecificMethodFrameListener close2 = new SpecificMethodFrameListener(2, ChannelCloseOkBody.class);
- SpecificMethodFrameListener open1a = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class);
- SpecificMethodFrameListener open1b = new SpecificMethodFrameListener(1, ChannelOpenOkBody.class);
-
- public void testEquals()
- {
- //Check that the the same objects are equal
- assertEquals("ChannelCloseOKBody a should equal a", close1a, close1a);
- assertEquals("ChannelOpenOkBody a should equal a", open1a, open1a);
-
- //check that the same values in differnt objects are equal
- assertEquals("ChannelCloseOKBody b should equal a", close1b, close1a);
- assertEquals("ChannelCloseOKBody a should equal b", close1a, close1b);
- assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b);
- assertEquals("ChannelOpenOkBody a should equal b", open1a, open1b);
-
- //Chec that different values fail
- //Different channels
- assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close1a.equals(close2));
- assertFalse("ChannelCloseOKBody channel 1 should NOT equal channel 2", close2.equals(close1a));
-
- //Different Bodies
- assertFalse("ChannelCloseOKBody should not equal ChannelOpenOkBody", close1a.equals(open1a));
- assertFalse("ChannelOpenOkBody should not equal ChannelCloseOKBody", open1a.equals(close1a));
- }
-
- public void testProcessMethod() throws AMQFrameDecodingException
- {
- ChannelCloseOkBody ccob = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9).createChannelCloseOkBody();
- ChannelOpenOkBody coob = ((MethodRegistry_0_9)(MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9))).createChannelOpenOkBody(new byte[0]);
-
- assertTrue("This SpecificMethodFrameListener should process a ChannelCloseOkBody", close1a.processMethod(1, ccob));
- assertFalse("This SpecificMethodFrameListener should NOT process a ChannelOpenOkBody", close1a.processMethod(1, coob));
-
-
-
-
- }
-}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 224463a446..e45312448c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -418,14 +418,14 @@ public class CommitRollbackTest extends TestCase
{
_logger.info("Got 2 redelivered, message was prefetched");
_gottwoRedelivered = true;
-
+
}
else
{
- _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
+ _logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
assertFalse("Already received message two", _gottwo);
assertFalse("Already received message redelivered two", _gottwoRedelivered);
-
+
_gottwo = true;
}
}
@@ -437,7 +437,7 @@ public class CommitRollbackTest extends TestCase
* This test sends two messages receives on of them but doesn't ack it.
* The consumer is then closed
* the first message should be returned as redelivered.
- * the second message should be delivered normally.
+ * the second message should be delivered normally.
* @throws Exception
*/
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
@@ -458,6 +458,7 @@ public class CommitRollbackTest extends TestCase
assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
_logger.info("Closing Consumer");
+
_consumer.close();
_logger.info("Creating New consumer");
@@ -465,33 +466,20 @@ public class CommitRollbackTest extends TestCase
_logger.info("receiving result");
-// NOTE: Both msg 1 & 2 will be marked as redelivered as they have both will have been rejected.
-// Only the occasion where it is not rejected will it mean it hasn't arrived at the client yet.
- result = _consumer.receive(5000);
- assertNotNull("test message was consumed and rolled back, but is gone", result);
-// The first message back will be either 1 or 2 being redelivered
- if (result.getJMSRedelivered())
- {
- assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
- }
- else // or it will be msg 2 arriving the first time due to latency.
- {
- _logger.info("Message 2 wasn't prefetched so wasn't rejected");
- assertEquals("2", ((TextMessage) result).getText());
- }
+ // Message 2 may be marked as redelivered if it was prefetched.
+ result = _consumer.receive(5000);
+ assertNotNull("Second message was not consumed, but is gone", result);
- Message result2 = _consumer.receive(5000);
- assertNotNull("test message was consumed and rolled back, but is gone", result2);
+ // The first message back will be 2, message 1 has been received but not committed
+ // Closing the consumer does not commit the session.
// if this is message 1 then it should be marked as redelivered
- if("1".equals(((TextMessage) result2).getText()))
+ if("1".equals(((TextMessage) result).getText()))
{
- assertTrue("Messasge is not marked as redelivered" + result2, result2.getJMSRedelivered());
+ fail("First message was recieved again");
}
- assertNotSame("Messages should not have the same content",((TextMessage) result2).getText(), ((TextMessage) result).getText() );
-
result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
new file mode 100644
index 0000000000..810d12f472
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ * extends CumulativeProtocolDecoder {
+ *
+ * private Command parseCommand(ByteBuffer in) {
+ * // Convert the bytes in the specified buffer to a
+ * // Command object.
+ * ...
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out)
+ * throws Exception {
+ *
+ * // Remember the initial position.
+ * int start = in.position();
+ *
+ * // Now find the first CRLF in the buffer.
+ * byte previous = 0;
+ * while (in.hasRemaining()) {
+ * byte current = in.get();
+ *
+ * if (previous == '\r' && current == '\n') {
+ * // Remember the current position and limit.
+ * int position = in.position();
+ * int limit = in.limit();
+ * try {
+ * in.position(start);
+ * in.limit(position);
+ * // The bytes between in.position() and in.limit()
+ * // now contain a full CRLF terminated line.
+ * out.write(parseCommand(in.slice()));
+ * } finally {
+ * // Set the position to point right after the
+ * // detected line and set the limit to the old
+ * // one.
+ * in.position(position);
+ * in.limit(limit);
+ * }
+ * // Decoded one line; CumulativeProtocolDecoder will
+ * // call me again until I return false. So just
+ * // return true until there are no more lines in the
+ * // buffer.
+ * return true;
+ * }
+ *
+ * previous = current;
+ * }
+ *
+ * // Could not find CRLF in the buffer. Reset the initial
+ * // position to the one we recorded above.
+ * in.position(start);
+ *
+ * return false;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 598285 $, $Date: 2007-11-26 14:16:01 +0000 (Mon, 26 Nov 2007) $
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = OurCumulativeProtocolDecoder.class
+ .getName()
+ + ".Buffer";
+
+ /**
+ * Creates a new instance.
+ */
+ protected OurCumulativeProtocolDecoder() {
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is NOT compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ boolean usingSessionBuffer = true;
+ ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+ // If we have a session buffer, append data to that; otherwise
+ // use the buffer read from the network directly.
+ if (buf != null) {
+ buf.put(in);
+ buf.flip();
+ } else {
+ buf = in;
+ usingSessionBuffer = false;
+ }
+
+ for (;;) {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buf, out);
+ if (decoded) {
+ if (buf.position() == oldPos) {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.hasRemaining()) {
+ storeRemainingInSession(buf, session);
+ } else {
+ if (usingSessionBuffer)
+ removeSessionBuffer(session);
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose(IoSession session) throws Exception {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session) {
+ ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+ if (buf != null) {
+ buf.release();
+ }
+ }
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+ ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.order(buf.order());
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
index 202ac1a530..cb24102edd 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
@@ -376,8 +376,7 @@ public class MultiThreadSocketConnector extends SocketConnector
// Set the ConnectFuture of the specified session, which will be
// removed and notified by AbstractIoFilterChain eventually.
-// session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
- session.setAttribute(AbstractIoFilterChain.class.getName() + ".connectFuture", connectFuture);
+ session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
// Forward the remaining process to the SocketIoProcessor.
session.getIoProcessor().addNew(session);
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
index 11c54bb248..03838ca3f1 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -66,9 +66,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
private volatile Selector selector, writeSelector;
private final Queue newSessions = new Queue();
@@ -90,11 +88,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
this.executor = executor;
}
- void addNew( SocketSessionImpl session ) throws IOException
+ void addNew(SocketSessionImpl session) throws IOException
{
- synchronized( newSessions )
+ synchronized (newSessions)
{
- newSessions.push( session );
+ newSessions.push(session);
}
startupWorker();
@@ -103,16 +101,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
writeSelector.wakeup();
}
- void remove( SocketSessionImpl session ) throws IOException
+ void remove(SocketSessionImpl session) throws IOException
{
- scheduleRemove( session );
+ scheduleRemove(session);
startupWorker();
selector.wakeup();
}
private void startupWorker() throws IOException
{
- synchronized(readLock)
+ synchronized (readLock)
{
if (readWorker == null)
{
@@ -122,7 +120,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (writeWorker == null)
{
@@ -134,38 +132,38 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
- void flush( SocketSessionImpl session )
+ void flush(SocketSessionImpl session)
{
- scheduleFlush( session );
+ scheduleFlush(session);
Selector selector = this.writeSelector;
- if( selector != null )
+ if (selector != null)
{
selector.wakeup();
}
}
- void updateTrafficMask( SocketSessionImpl session )
+ void updateTrafficMask(SocketSessionImpl session)
{
- scheduleTrafficControl( session );
+ scheduleTrafficControl(session);
Selector selector = this.selector;
- if( selector != null )
+ if (selector != null)
{
selector.wakeup();
}
}
- private void scheduleRemove( SocketSessionImpl session )
+ private void scheduleRemove(SocketSessionImpl session)
{
- synchronized( removingSessions )
+ synchronized (removingSessions)
{
- removingSessions.push( session );
+ removingSessions.push(session);
}
}
- private void scheduleFlush( SocketSessionImpl session )
+ private void scheduleFlush(SocketSessionImpl session)
{
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
//if flushingSessions grows to contain Integer.MAX_VALUE sessions
// then this will fail.
@@ -176,31 +174,31 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
- private void scheduleTrafficControl( SocketSessionImpl session )
+ private void scheduleTrafficControl(SocketSessionImpl session)
{
- synchronized( trafficControllingSessions )
+ synchronized (trafficControllingSessions)
{
- trafficControllingSessions.push( session );
+ trafficControllingSessions.push(session);
}
}
private void doAddNewReader() throws InterruptedException
{
- if( newSessions.isEmpty() )
+ if (newSessions.isEmpty())
{
return;
}
- for( ; ; )
+ for (; ;)
{
MultiThreadSocketSessionImpl session;
- synchronized( newSessions )
+ synchronized (newSessions)
{
session = (MultiThreadSocketSessionImpl) newSessions.peek();
}
- if( session == null )
+ if (session == null)
{
break;
}
@@ -211,21 +209,20 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
- ch.configureBlocking( false );
- session.setSelectionKey( ch.register( selector,
- SelectionKey.OP_READ,
- session ) );
-
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector,
+ SelectionKey.OP_READ,
+ session));
//System.out.println("ReadDebug:"+"Awaiting Registration");
session.awaitRegistration();
sessionCreated(session);
}
- catch( IOException e )
+ catch (IOException e)
{
// Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
@@ -242,7 +239,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
MultiThreadSocketSessionImpl session;
- synchronized(newSessions)
+ synchronized (newSessions)
{
session = (MultiThreadSocketSessionImpl) newSessions.peek();
}
@@ -257,7 +254,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
ch.configureBlocking(false);
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
flushingSessionsSet.add(session);
}
@@ -275,17 +272,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
-
private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
{
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
- synchronized(newSessions)
+ synchronized (newSessions)
{
if (!session.created())
{
@@ -294,7 +290,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
- session.getServiceListeners().fireSessionCreated( session );
+ session.getServiceListeners().fireSessionCreated(session);
session.doneCreation();
}
@@ -303,21 +299,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void doRemove()
{
- if( removingSessions.isEmpty() )
+ if (removingSessions.isEmpty())
{
return;
}
- for( ; ; )
+ for (; ;)
{
MultiThreadSocketSessionImpl session;
- synchronized( removingSessions )
+ synchronized (removingSessions)
{
session = (MultiThreadSocketSessionImpl) removingSessions.pop();
}
- if( session == null )
+ if (session == null)
{
break;
}
@@ -330,7 +326,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// (In case that Session.close() is called before addSession() is processed)
if (key == null || writeKey == null)
{
- scheduleRemove( session );
+ scheduleRemove(session);
break;
}
// skip if channel is already closed
@@ -342,24 +338,24 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
try
{
//System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
- synchronized(readLock)
+ synchronized (readLock)
{
key.cancel();
}
- synchronized(writeLock)
+ synchronized (writeLock)
{
writeKey.cancel();
}
ch.close();
}
- catch( IOException e )
+ catch (IOException e)
{
- session.getFilterChain().fireExceptionCaught( session, e );
+ session.getFilterChain().fireExceptionCaught(session, e);
}
finally
{
- releaseWriteBuffers( session );
- session.getServiceListeners().fireSessionDestroyed( session );
+ releaseWriteBuffers(session);
+ session.getServiceListeners().fireSessionDestroyed(session);
}
}
}
@@ -368,16 +364,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
Iterator it = selectedKeys.iterator();
- while( it.hasNext() )
+ while (it.hasNext())
{
- SelectionKey key = ( SelectionKey ) it.next();
+ SelectionKey key = (SelectionKey) it.next();
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
- synchronized(readLock)
+ synchronized (readLock)
{
if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
{
- read( session );
+ read(session);
}
}
@@ -395,7 +391,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
SelectionKey key = (SelectionKey) it.next();
SocketSessionImpl session = (SocketSessionImpl) key.attachment();
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
{
@@ -403,7 +399,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
// Clear OP_WRITE
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
flushingSessions.offer(session);
}
@@ -424,7 +420,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
int totalReadBytes = 0;
- for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;)
+ while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
{
ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
SocketChannel ch = session.getChannel();
@@ -482,6 +478,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(session, e);
+
+ //Stop Reading this session.
+ return;
}
finally
{
@@ -507,12 +506,12 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
{
lastIdleReadCheckTime = currentTime;
Set keys = selector.keys();
- if( keys != null )
+ if (keys != null)
{
- for( Iterator it = keys.iterator(); it.hasNext(); )
+ for (Iterator it = keys.iterator(); it.hasNext();)
{
- SelectionKey key = ( SelectionKey ) it.next();
- SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
notifyReadIdleness(session, currentTime);
}
}
@@ -542,15 +541,15 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
{
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
- IdleStatus.BOTH_IDLE,
- Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
- IdleStatus.READER_IDLE,
- Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE,
+ Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
notifyWriteTimeout(session, currentTime, session
.getWriteTimeoutInMillis(), session.getLastWriteTime());
@@ -559,51 +558,51 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
{
notifyIdleness0(
- session, currentTime,
+ session, currentTime,
session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
IdleStatus.BOTH_IDLE,
Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleness0(
session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
- IdleStatus.WRITER_IDLE,
- Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+ session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE,
+ Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
- notifyWriteTimeout( session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
}
- private void notifyIdleness0( SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime )
+ private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime)
{
- if( idleTime > 0 && lastIoTime != 0
- && ( currentTime - lastIoTime ) >= idleTime )
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime)
{
- session.increaseIdleCount( status );
- session.getFilterChain().fireSessionIdle( session, status );
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
}
}
- private void notifyWriteTimeout( SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime )
+ private void notifyWriteTimeout(SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime)
{
MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
SelectionKey key = sesh.getWriteSelectionKey();
- synchronized(writeLock)
- {
- if( writeTimeout > 0
- && ( currentTime - lastIoTime ) >= writeTimeout
- && key != null && key.isValid()
- && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+ synchronized (writeLock)
{
- session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() );
+ if (writeTimeout > 0
+ && (currentTime - lastIoTime) >= writeTimeout
+ && key != null && key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ {
+ session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+ }
}
}
- }
private SocketSessionImpl getNextFlushingSession()
{
@@ -612,9 +611,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
private void releaseSession(SocketSessionImpl session)
{
- synchronized(session.getWriteRequestQueue())
+ synchronized (session.getWriteRequestQueue())
{
- synchronized(flushingSessionsSet)
+ synchronized (flushingSessionsSet)
{
if (session.getScheduledWriteRequests() > 0)
{
@@ -642,7 +641,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
WriteRequest req;
//Should this be synchronized?
- synchronized(writeRequestQueue)
+ synchronized (writeRequestQueue)
{
while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
{
@@ -668,9 +667,9 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
{
- if( !session.isConnected() )
+ if (!session.isConnected())
{
- releaseWriteBuffers( session );
+ releaseWriteBuffers(session);
releaseSession(session);
continue;
}
@@ -678,14 +677,14 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
SelectionKey key = session.getWriteSelectionKey();
// Retry later if session is not yet fully initialized.
// (In case that Session.write() is called before addSession() is processed)
- if( key == null )
+ if (key == null)
{
- scheduleFlush( session );
+ scheduleFlush(session);
releaseSession(session);
continue;
}
// skip if channel is already closed
- if( !key.isValid() )
+ if (!key.isValid())
{
releaseSession(session);
continue;
@@ -698,11 +697,11 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
releaseSession(session);
}
}
- catch( IOException e )
+ catch (IOException e)
{
releaseSession(session);
- scheduleRemove( session );
- session.getFilterChain().fireExceptionCaught( session, e );
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
}
}
@@ -714,32 +713,32 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
// Clear OP_WRITE
SelectionKey key = session.getWriteSelectionKey();
- synchronized(writeLock)
+ synchronized (writeLock)
{
- key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
SocketChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
long totalFlushedBytes = 0;
- for( ; ; )
+ while (true)
{
WriteRequest req;
- synchronized( writeRequestQueue )
+ synchronized (writeRequestQueue)
{
- req = ( WriteRequest ) writeRequestQueue.first();
+ req = (WriteRequest) writeRequestQueue.first();
}
- if( req == null )
+ if (req == null)
{
break;
}
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0)
{
- synchronized( writeRequestQueue )
+ synchronized (writeRequestQueue)
{
writeRequestQueue.pop();
}
@@ -747,7 +746,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
session.increaseWrittenMessages();
buf.reset();
- session.getFilterChain().fireMessageSent( session, req );
+ session.getFilterChain().fireMessageSent(session, req);
continue;
}
@@ -755,23 +754,16 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
int writtenBytes = 0;
// Reported as DIRMINA-362
- //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
-// if (key.isWritable())
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+ if (key.isWritable())
{
- try
- {
- writtenBytes = ch.write(buf.buf());
- totalFlushedBytes += writtenBytes;
- }
- catch (IOException ioe)
- {
- throw ioe;
- }
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
}
- if( writtenBytes > 0 )
+ if (writtenBytes > 0)
{
- session.increaseWrittenBytes( writtenBytes );
+ session.increaseWrittenBytes(writtenBytes);
}
if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
@@ -911,7 +903,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
if (writeSelector.keys().isEmpty())
{
- synchronized(writeLock)
+ synchronized (writeLock)
{
if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
@@ -963,7 +955,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
//System.out.println("ReadDebug:"+"Startup");
- for( ; ; )
+ for (; ;)
{
try
{
@@ -972,7 +964,7 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
doAddNewReader();
doUpdateTrafficMask();
- if( nKeys > 0 )
+ if (nKeys > 0)
{
//System.out.println("ReadDebug:"+nKeys + " keys from selector");
@@ -987,21 +979,21 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
doRemove();
notifyReadIdleness();
- if( selector.keys().isEmpty() )
+ if (selector.keys().isEmpty())
{
- synchronized(readLock)
+ synchronized (readLock)
{
- if( selector.keys().isEmpty() && newSessions.isEmpty() )
+ if (selector.keys().isEmpty() && newSessions.isEmpty())
{
readWorker = null;
try
{
selector.close();
}
- catch( IOException e )
+ catch (IOException e)
{
- ExceptionMonitor.getInstance().exceptionCaught( e );
+ ExceptionMonitor.getInstance().exceptionCaught(e);
}
finally
{
@@ -1013,17 +1005,17 @@ class MultiThreadSocketIoProcessor extends SocketIoProcessor
}
}
}
- catch( Throwable t )
+ catch (Throwable t)
{
- ExceptionMonitor.getInstance().exceptionCaught( t );
+ ExceptionMonitor.getInstance().exceptionCaught(t);
try
{
- Thread.sleep( 1000 );
+ Thread.sleep(1000);
}
- catch( InterruptedException e1 )
+ catch (InterruptedException e1)
{
- ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
new file mode 100644
index 0000000000..16e74b17d2
--- /dev/null
+++ b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.transport.vmpipe.support.VmPipe;
+import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
+import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
+import org.apache.mina.util.AnonymousSocketAddress;
+
+/**
+ * Connects to {@link IoHandler}s which is bound on the specified
+ * {@link VmPipeAddress}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class QpidVmPipeConnector extends VmPipeConnector
+{
+ private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
+ private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig()
+ {
+ public IoSessionConfig getSessionConfig()
+ {
+ return CONFIG;
+ }
+ };
+
+ /**
+ * Creates a new instance.
+ */
+ public QpidVmPipeConnector()
+ {
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ return connect( address, null, handler, config );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+ if( ! ( address instanceof VmPipeAddress ) )
+ throw new IllegalArgumentException(
+ "address must be VmPipeAddress." );
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address );
+ if( entry == null )
+ {
+ return DefaultConnectFuture.newFailedFuture(
+ new IOException( "Endpoint unavailable: " + address ) );
+ }
+
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ VmPipeSessionImpl localSession =
+ new VmPipeSessionImpl(
+ this,
+ config,
+ getListeners(),
+ new Object(), // lock
+ new AnonymousSocketAddress(),
+ handler,
+ entry );
+
+ // initialize acceptor session
+ VmPipeSessionImpl remoteSession = localSession.getRemoteSession();
+ try
+ {
+ IoFilterChain filterChain = remoteSession.getFilterChain();
+ entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ entry.getListeners().fireSessionCreated( remoteSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ remoteSession.close();
+ }
+
+
+ // initialize connector session
+ try
+ {
+ IoFilterChain filterChain = localSession.getFilterChain();
+ this.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
+ getListeners().fireSessionCreated( localSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( localSession);
+ }
+ catch( Throwable t )
+ {
+ future.setException( t );
+ }
+
+
+
+ return future;
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+} \ No newline at end of file
diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml
index 911d488f94..69deaa383e 100644
--- a/java/perftests/pom.xml
+++ b/java/perftests/pom.xml
@@ -243,10 +243,10 @@
<!-- Performance Tests. -->
<!-- Transient, P2P Tests -->
- <TQCT-Qpid-01>-n TQCT-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-01>
- <TQCT-Qpid-02>-n TQCT-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-02>
- <TQCL-Qpid-01>-n TQCL-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-01>
- <TQCL-Qpid-02>-n TQCL-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-02>
+ <TQCT-Qpid-01>-n TQCT-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-01>
+ <TQCT-Qpid-02>-n TQCT-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCT-Qpid-02>
+ <TQCL-Qpid-01>-n TQCL-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-01>
+ <TQCL-Qpid-02>-n TQCL-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=2000 maxPending=2000000 </TQCL-Qpid-02>
<!-- <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TQC-Qpid-05> -->
<!-- <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TQC-Qpid-06> -->
@@ -269,10 +269,10 @@
<TQM-Qpid-02-1M>-n TQM-Qpid-02-1M -d10M -s[100] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048576 destinationCount=1 rate=0 maxPending=100000000</TQM-Qpid-02-1M>
<!-- Transient, Pub/Sub Tests -->
- <TTCT-Qpid-01>-n TTCT-Qpid-01 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-01>
- <TTCT-Qpid-02>-n TTCT-Qpid-02 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-02>
- <TTCL-Qpid-01>-n TTCL-Qpid-01 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-01>
- <TTCL-Qpid-02>-n TTCL-Qpid-02 -d1M -s[10] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-02>
+ <TTCT-Qpid-01>-n TTCT-Qpid-01 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-01>
+ <TTCT-Qpid-02>-n TTCT-Qpid-02 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCT-Qpid-02>
+ <TTCL-Qpid-01>-n TTCL-Qpid-01 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-01>
+ <TTCL-Qpid-02>-n TTCL-Qpid-02 -d1M -s[10] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=40 maxPending=2000000 </TTCL-Qpid-02>
<!-- <TTC-Qpid-05>-n TTC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TTC-Qpid-05> -->
<!-- <TTC-Qpid-06>-n TTC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </TTC-Qpid-06> -->
@@ -295,10 +295,10 @@
<TTM-Qpid-02-1M>-n TTM-Qpid-02-1M -d10M -s[4] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048476 destinationCount=1 rate=0 maxPending=20000000</TTM-Qpid-02-1M>
<!-- Persistent, P2P Tests -->
- <PQCT-Qpid-01>-n PQCT-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCT-Qpid-01>
- <PQCT-Qpid-02>-n PQCT-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCT-Qpid-02>
- <PQCL-Qpid-01>-n PQCL-Qpid-01 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCL-Qpid-01>
- <PQCL-Qpid-02>-n PQCL-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCL-Qpid-02>
+ <PQCT-Qpid-01>-n PQCT-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCT-Qpid-01>
+ <PQCT-Qpid-02>-n PQCT-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCT-Qpid-02>
+ <PQCL-Qpid-01>-n PQCL-Qpid-01 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=600 maxPending=2000000 </PQCL-Qpid-01>
+ <PQCL-Qpid-02>-n PQCL-Qpid-02 -d1M -s[1000] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=100 maxPending=2000000 </PQCL-Qpid-02>
<!-- <PQC-Qpid-05>-n PQC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PQC-Qpid-05> -->
<!-- <PQC-Qpid-06>-n PQC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PQC-Qpid-06> -->
@@ -321,10 +321,10 @@
<PQM-Qpid-02-1M>-n PQM-Qpid-02-1M -d10M -s[4] -c[8] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=1048576 destinationCount=1 rate=0 maxPending=20000000</PQM-Qpid-02-1M>
<!-- Persistent, Pub/Sub Tests -->
- <PTCT-Qpid-01>-n PTCT-Qpid-01 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-01>
- <PTCT-Qpid-02>-n PTCT-Qpid-02 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-02>
- <PTCL-Qpid-01>-n PTCL-Qpid-01 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-01>
- <PTCL-Qpid-02>-n PTCL-Qpid-02 -d1M -s[1] -c[1,30],samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-02>
+ <PTCT-Qpid-01>-n PTCT-Qpid-01 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-01>
+ <PTCT-Qpid-02>-n PTCT-Qpid-02 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCT-Qpid-02>
+ <PTCL-Qpid-01>-n PTCL-Qpid-01 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-01>
+ <PTCL-Qpid-02>-n PTCL-Qpid-02 -d1M -s[1] -c[1:30]:samples=30 -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=1 rate=1 maxPending=2000000 </PTCL-Qpid-02>
<!-- <PTC-Qpid-05>-n PTC-Qpid-05 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PTC-Qpid-05> -->
<!-- <PTC-Qpid-06>-n PTC-Qpid-06 -d10M -s[10] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true uniqueDests=false numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=10 batchSize=1000 messageSize=256 destinationCount=10 rate=0 maxPending=100000 </PTC-Qpid-06> -->
@@ -453,169 +453,6 @@
<TTBL-NA-Qpid-06-09>-n TTBL-NA-Qpid-06 -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=50 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1001 messageSize=5120 destinationCount=1 rate=900 maxPending=2000000 </TTBL-NA-Qpid-06-09>
<TTBL-NA-Qpid-06-10>-n TTBL-NA-Qpid-06 -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=true uniqueDests=false numConsumers=50 transacted=false consTransacted=false consAckMode=257 commitBatchSize=1 batchSize=1001 messageSize=5120 destinationCount=1 rate=1000 maxPending=2000000 </TTBL-NA-Qpid-06-10>
- <!-- Benchmarking Tests for P2P. -->
- <!--
- Bench mark 1. P2P messaging from 1:1 to 1:32, shared queue, load balancing scenario.
- Non-tx, and 1 msg per tx.
- Persistent and non-persistent.
- No rate limiting.
- Message size 256 bytes.
- -->
- <!--
- <TQBT-Qpid-01-1C> -n TQBT-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-1C>
- <TQBT-Qpid-01-2C> -n TQBT-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-2C>
- <TQBT-Qpid-01-4C> -n TQBT-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-4C>
- <TQBT-Qpid-01-8C> -n TQBT-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-8C>
- <TQBT-Qpid-01-16C>-n TQBT-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-16C>
- <TQBT-Qpid-01-32C>-n TQBT-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-01-32C>
-
- <TQBT-Qpid-02-1C> -n TQBT-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-1C>
- <TQBT-Qpid-02-2C> -n TQBT-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-2C>
- <TQBT-Qpid-02-4C> -n TQBT-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-4C>
- <TQBT-Qpid-02-8C> -n TQBT-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-8C>
- <TQBT-Qpid-02-16C>-n TQBT-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-16C>
- <TQBT-Qpid-02-32C>-n TQBT-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-02-32C>
-
- <PQBT-Qpid-01-1C> -n PQBT-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-1C>
- <PQBT-Qpid-01-2C> -n PQBT-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-2C>
- <PQBT-Qpid-01-4C> -n PQBT-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-4C>
- <PQBT-Qpid-01-8C> -n PQBT-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-8C>
- <PQBT-Qpid-01-16C>-n PQBT-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-16C>
- <PQBT-Qpid-01-32C>-n PQBT-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-01-32C>
-
- <PQBT-Qpid-02-1C> -n PQBT-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-1C>
- <PQBT-Qpid-02-2C> -n PQBT-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-2C>
- <PQBT-Qpid-02-4C> -n PQBT-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-4C>
- <PQBT-Qpid-02-8C> -n PQBT-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-8C>
- <PQBT-Qpid-02-16C>-n PQBT-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-16C>
- <PQBT-Qpid-02-32C>-n PQBT-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-02-32C>
-
- <TQBL-Qpid-01-1C> -n TQBL-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-1C>
- <TQBL-Qpid-01-2C> -n TQBL-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-2C>
- <TQBL-Qpid-01-4C> -n TQBL-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-4C>
- <TQBL-Qpid-01-8C> -n TQBL-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-8C>
- <TQBL-Qpid-01-16C>-n TQBL-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-16C>
- <TQBL-Qpid-01-32C>-n TQBL-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-01-32C>
-
- <TQBL-Qpid-02-1C> -n TQBL-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-1C>
- <TQBL-Qpid-02-2C> -n TQBL-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-2C>
- <TQBL-Qpid-02-4C> -n TQBL-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-4C>
- <TQBL-Qpid-02-8C> -n TQBL-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-8C>
- <TQBL-Qpid-02-16C>-n TQBL-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-16C>
- <TQBL-Qpid-02-32C>-n TQBL-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-02-32C>
-
- <PQBL-Qpid-01-1C> -n PQBL-Qpid-01-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-1C>
- <PQBL-Qpid-01-2C> -n PQBL-Qpid-01-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-2C>
- <PQBL-Qpid-01-4C> -n PQBL-Qpid-01-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-4C>
- <PQBL-Qpid-01-8C> -n PQBL-Qpid-01-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-8C>
- <PQBL-Qpid-01-16C>-n PQBL-Qpid-01-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-16C>
- <PQBL-Qpid-01-32C>-n PQBL-Qpid-01-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-01-32C>
-
- <PQBL-Qpid-02-1C> -n PQBL-Qpid-02-1C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-1C>
- <PQBL-Qpid-02-2C> -n PQBL-Qpid-02-2C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=2 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-2C>
- <PQBL-Qpid-02-4C> -n PQBL-Qpid-02-4C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=4 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-4C>
- <PQBL-Qpid-02-8C> -n PQBL-Qpid-02-8C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=8 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-8C>
- <PQBL-Qpid-02-16C>-n PQBL-Qpid-02-16C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=16 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-16C>
- <PQBL-Qpid-02-32C>-n PQBL-Qpid-02-32C -d1M -s[1000] -c[1] -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=32 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-02-32C>
- -->
-
- <!--
- Bench mark 2. P2P messaging 1:1, scaled up from 1 to 32 times. Queues not shared, just one to one throughput.
- Non-tx, and 1 msg per tx.
- Persistent and non-persistent.
- No rate limiting.
- Message size from 128 bytes to 1 Meg.
- -->
- <!--
- <TQBT-Qpid-03-128b>-n TQBT-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-128b>
- <TQBT-Qpid-03-256b>-n TQBT-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-256b>
- <TQBT-Qpid-03-512b>-n TQBT-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-512b>
- <TQBT-Qpid-03-1K>-n TQBT-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-1K>
- <TQBT-Qpid-03-5K>-n TQBT-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-5K>
- <TQBT-Qpid-03-10K>-n TQBT-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-10K>
- <TQBT-Qpid-03-50K>-n TQBT-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-50K>
- <TQBT-Qpid-03-100K>-n TQBT-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-100K>
- <TQBT-Qpid-03-500K>-n TQBT-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-500K>
- <TQBT-Qpid-03-1M>-n TQBT-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-03-1M>
-
- <TQBT-Qpid-04-128b>-n TQBT-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-128b>
- <TQBT-Qpid-04-256b>-n TQBT-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-256b>
- <TQBT-Qpid-04-512b>-n TQBT-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-512b>
- <TQBT-Qpid-04-1K>-n TQBT-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-1K>
- <TQBT-Qpid-04-5K>-n TQBT-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-5K>
- <TQBT-Qpid-04-10K>-n TQBT-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-10K>
- <TQBT-Qpid-04-50K>-n TQBT-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-50K>
- <TQBT-Qpid-04-100K>-n TQBT-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-100K>
- <TQBT-Qpid-04-500K>-n TQBT-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-500K>
- <TQBT-Qpid-04-1M>-n TQBT-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBT-Qpid-04-1M>
-
- <PQBT-Qpid-03-128b>-n PQBT-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-128b>
- <PQBT-Qpid-03-256b>-n PQBT-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-256b>
- <PQBT-Qpid-03-512b>-n PQBT-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-512b>
- <PQBT-Qpid-03-1K>-n PQBT-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-1K>
- <PQBT-Qpid-03-5K>-n PQBT-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-5K>
- <PQBT-Qpid-03-10K>-n PQBT-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-10K>
- <PQBT-Qpid-03-50K>-n PQBT-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-50K>
- <PQBT-Qpid-03-100K>-n PQBT-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-100K>
- <PQBT-Qpid-03-500K>-n PQBT-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-500K>
- <PQBT-Qpid-03-1M>-n PQBT-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-03-1M>
-
- <PQBT-Qpid-04-128b>-n PQBT-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-128b>
- <PQBT-Qpid-04-256b>-n PQBT-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-256b>
- <PQBT-Qpid-04-512b>-n PQBT-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-512b>
- <PQBT-Qpid-04-1K>-n PQBT-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-1K>
- <PQBT-Qpid-04-5K>-n PQBT-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-5K>
- <PQBT-Qpid-04-10K>-n PQBT-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-10K>
- <PQBT-Qpid-04-50K>-n PQBT-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-50K>
- <PQBT-Qpid-04-100K>-n PQBT-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-100K>
- <PQBT-Qpid-04-500K>-n PQBT-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-500K>
- <PQBT-Qpid-04-1M>-n PQBT-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBT-Qpid-04-1M>
-
- <TQBL-Qpid-03-128b>-n TQBL-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-128b>
- <TQBL-Qpid-03-256b>-n TQBL-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-256b>
- <TQBL-Qpid-03-512b>-n TQBL-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-512b>
- <TQBL-Qpid-03-1K>-n TQBL-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-1K>
- <TQBL-Qpid-03-5K>-n TQBL-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-5K>
- <TQBL-Qpid-03-10K>-n TQBL-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-10K>
- <TQBL-Qpid-03-50K>-n TQBL-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-50K>
- <TQBL-Qpid-03-100K>-n TQBL-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-100K>
- <TQBL-Qpid-03-500K>-n TQBL-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-500K>
- <TQBL-Qpid-03-1M>-n TQBL-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-03-1M>
-
- <TQBL-Qpid-04-128b>-n TQBL-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-128b>
- <TQBL-Qpid-04-256b>-n TQBL-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-256b>
- <TQBL-Qpid-04-512b>-n TQBL-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-512b>
- <TQBL-Qpid-04-1K>-n TQBL-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-1K>
- <TQBL-Qpid-04-5K>-n TQBL-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-5K>
- <TQBL-Qpid-04-10K>-n TQBL-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-10K>
- <TQBL-Qpid-04-50K>-n TQBL-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-50K>
- <TQBL-Qpid-04-100K>-n TQBL-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-100K>
- <TQBL-Qpid-04-500K>-n TQBL-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-500K>
- <TQBL-Qpid-04-1M>-n TQBL-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=false pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </TQBL-Qpid-04-1M>
-
- <PQBL-Qpid-03-128b>-n PQBL-Qpid-03-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-128b>
- <PQBL-Qpid-03-256b>-n PQBL-Qpid-03-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-256b>
- <PQBL-Qpid-03-512b>-n PQBL-Qpid-03-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-512b>
- <PQBL-Qpid-03-1K>-n PQBL-Qpid-03-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-1K>
- <PQBL-Qpid-03-5K>-n PQBL-Qpid-03-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-5K>
- <PQBL-Qpid-03-10K>-n PQBL-Qpid-03-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-10K>
- <PQBL-Qpid-03-50K>-n PQBL-Qpid-03-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-50K>
- <PQBL-Qpid-03-100K>-n PQBL-Qpid-03-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-100K>
- <PQBL-Qpid-03-500K>-n PQBL-Qpid-03-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-500K>
- <PQBL-Qpid-03-1M>-n PQBL-Qpid-03-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-03-1M>
-
- <PQBL-Qpid-04-128b>-n PQBL-Qpid-04-128b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-128b>
- <PQBL-Qpid-04-256b>-n PQBL-Qpid-04-256b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-256b>
- <PQBL-Qpid-04-512b>-n PQBL-Qpid-04-512b -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-512b>
- <PQBL-Qpid-04-1K>-n PQBL-Qpid-04-1K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1024 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-1K>
- <PQBL-Qpid-04-5K>-n PQBL-Qpid-04-5K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=5120 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-5K>
- <PQBL-Qpid-04-10K>-n PQBL-Qpid-04-10K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=10240 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-10K>
- <PQBL-Qpid-04-50K>-n PQBL-Qpid-04-50K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=51200 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-50K>
- <PQBL-Qpid-04-100K>-n PQBL-Qpid-04-100K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=102400 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-100K>
- <PQBL-Qpid-04-500K>-n PQBL-Qpid-04-500K -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=512000 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-500K>
- <PQBL-Qpid-04-1M>-n PQBL-Qpid-04-1M -d1M -s[100] -c[1,32],samples=6,exp -o $QPID_WORK/results -t testPingLatency org.apache.qpid.ping.PingLatencyTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=false consTransacted=false consAckMode=1 commitBatchSize=1 batchSize=100 messageSize=1048576 destinationCount=1 rate=0 maxPending=2000000 </PQBL-Qpid-04-1M>
- -->
-
<!-- Failover Tests. -->
<FT-Qpid-01>-n FT-Qpid-01 -s[2500] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true properties=failovertest.properties failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-01>
<FT-Qpid-02>-n FT-Qpid-02 -s[2500] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true properties=failovertest.properties failAfterSend=true -o $QPID_WORK/results</FT-Qpid-02>
diff --git a/java/pom.xml b/java/pom.xml
index c8093965e0..becea34848 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -163,7 +163,6 @@ under the License.
<module>management/eclipse-plugin</module>
<module>client/example</module>
<module>client-java14</module>
-
</modules>
@@ -517,17 +516,17 @@ under the License.
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-java5</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1</version>
</dependency>
<dependency>
<groupId>backport-util-concurrent</groupId>
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
index 9629f87d46..624d9c9f3d 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
@@ -20,27 +20,16 @@
*/
package org.apache.qpid.test;
-import junit.extensions.TestSetup;
-
-import junit.framework.Test;
import junit.framework.TestCase;
-
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
import javax.naming.Context;
import javax.naming.spi.InitialContextFactory;
-
import java.util.HashMap;
import java.util.Hashtable;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
public class VMTestCase extends TestCase
@@ -89,7 +78,7 @@ public class VMTestCase extends TestCase
}
env.put("connectionfactory.connection", "amqp://guest:guest@" + _clientID + _virtualhost + "?brokerlist='"
- + _brokerlist + "'");
+ + _brokerlist + "'");
for (Map.Entry<String, String> c : _connections.entrySet())
{
@@ -121,6 +110,12 @@ public class VMTestCase extends TestCase
super.tearDown();
}
+ public int getMessageCount(String queueName)
+ {
+ return ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1))
+ .getQueueRegistry().getQueue(new AMQShortString(queueName)).getMessageCount();
+ }
+
public void testDummyinVMTestCase()
{
// keep maven happy
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
new file mode 100644
index 0000000000..037c8285bc
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -0,0 +1,140 @@
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import java.util.concurrent.CountDownLatch;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class DupsOkTest extends VMTestCase
+{
+
+ private Queue _queue;
+ private static final int MSG_COUNT = 9999;
+ private CountDownLatch _awaitCompletion = new CountDownLatch(1);
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queue = (Queue) _context.lookup("queue");
+
+ //CreateQueue
+ ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int count = 1; count <= MSG_COUNT; count++)
+ {
+ Message msg = producerSession.createTextMessage("Message " + count);
+ msg.setIntProperty("count", count);
+ producer.send(msg);
+ }
+
+ producerConnection.close();
+ }
+
+ public void testDupsOK() throws NamingException, JMSException, InterruptedException
+ {
+ //Create Client
+ Connection clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ clientConnection.start();
+
+ Session clientSession = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageConsumer consumer = clientSession.createConsumer(_queue);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ if (message == null)
+ {
+ fail("Should not get null messages");
+ }
+
+ if (message instanceof TextMessage)
+ {
+ try
+ {
+ /*if (message.getIntProperty("count") == 5000)
+ {
+ assertEquals("The queue should have 4999 msgs left", 4999, getMessageCount(_queue.getQueueName()));
+ }*/
+
+ if (message.getIntProperty("count") == 9999)
+ {
+ assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName()));
+
+ //This is the last message so release test.
+ _awaitCompletion.countDown();
+ }
+
+ }
+ catch (JMSException e)
+ {
+ fail("Unable to get int property 'count'");
+ }
+ }
+ else
+ {
+ fail("");
+ }
+ }
+ });
+
+ try
+ {
+ _awaitCompletion.await();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Unable to wait for test completion");
+ throw e;
+ }
+
+// consumer.close();
+
+ clientConnection.close();
+
+ assertEquals("The queue should have 0 msgs left", 0, getMessageCount(_queue.getQueueName()));
+ }
+
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
new file mode 100644
index 0000000000..fffe073362
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -0,0 +1,222 @@
+package org.apache.qpid.test.client.failover;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+public class FailoverTest extends TestCase implements ConnectionListener
+{
+ private static final Logger _logger = Logger.getLogger(FailoverTest.class);
+
+ private static final int NUM_BROKERS = 2;
+ private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
+ private static final String QUEUE = "queue";
+ private static final int NUM_MESSAGES = 10;
+ private Connection con;
+ private AMQConnectionFactory conFactory;
+ private Session prodSess;
+ private AMQQueue q;
+ private MessageProducer prod;
+ private Session conSess;
+ private MessageConsumer consumer;
+
+ private static int usedBrokers = 0;
+ private CountDownLatch failoverComplete;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ // Create two VM brokers
+
+ for (int i = 0; i < NUM_BROKERS; i++)
+ {
+ usedBrokers++;
+
+ TransportConnection.createVMBroker(usedBrokers);
+ }
+ //undo last addition
+
+ conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
+ _logger.info("Connecting on:" + conFactory.getConnectionURL());
+ con = conFactory.createConnection();
+ ((AMQConnection) con).setConnectionListener(this);
+ con.start();
+ failoverComplete = new CountDownLatch(1);
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException
+ {
+ prodSess = con.createSession(transacted, mode);
+ q = new AMQQueue("amq.direct", QUEUE);
+ prod = prodSess.createProducer(q);
+ conSess = con.createSession(transacted, mode);
+ consumer = conSess.createConsumer(q);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+
+ }
+
+ try
+ {
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.removeAll();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to clean up");
+ }
+ super.tearDown();
+ }
+
+ private void consumeMessages(int toConsume) throws JMSException
+ {
+ Message msg;
+ for (int i = 0; i < toConsume; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+ assertEquals("message " + i, ((TextMessage) msg).getText());
+ }
+ }
+
+ private void sendMessages(int totalMessages) throws JMSException
+ {
+ for (int i = 0; i < totalMessages; i++)
+ {
+ prod.send(prodSess.createTextMessage("message " + i));
+ }
+
+// try
+// {
+// Thread.sleep(100 * totalMessages);
+// }
+// catch (InterruptedException e)
+// {
+// //evil ignoring of IE
+// }
+ }
+
+ public void testP2PFailover() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, true);
+ }
+
+ public void testP2PFailoverWithMessagesLeft() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, false);
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+ {
+ Message msg = null;
+ init(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(totalMessages);
+
+ // Consume some messages
+ int toConsume = totalMessages;
+ if (!consumeAll)
+ {
+ toConsume = totalMessages / 2;
+ }
+
+ consumeMessages(toConsume);
+
+ _logger.info("Failing over");
+
+ causeFailure();
+
+ msg = consumer.receive(500);
+ //todo: reinstate
+ assertNull("Should not have received message from new broker!", msg);
+ // Check that messages still sent / received
+ sendMessages(totalMessages);
+ consumeMessages(totalMessages);
+ }
+
+ private void causeFailure()
+ {
+ _logger.info("Failover");
+
+ TransportConnection.killVMBroker(usedBrokers - 1);
+ ApplicationRegistry.remove(usedBrokers - 1);
+
+ _logger.info("Awaiting Failover completion");
+ try
+ {
+ failoverComplete.await();
+ }
+ catch (InterruptedException e)
+ {
+ //evil ignore IE.
+ }
+ }
+
+ public void testClientAckFailover() throws Exception
+ {
+ init(false, Session.CLIENT_ACKNOWLEDGE);
+ sendMessages(1);
+ Message msg = consumer.receive();
+ assertNotNull("Expected msgs not received", msg);
+
+
+ causeFailure();
+
+ Exception failure = null;
+ try
+ {
+ msg.acknowledge();
+ }
+ catch (Exception e)
+ {
+ failure = e;
+ }
+ assertNotNull("Exception should be thrown", failure);
+ }
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ failoverComplete.countDown();
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
new file mode 100644
index 0000000000..f83e6e51cb
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.test.unit.ack;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+public class AcknowledgeTest extends VMTestCase
+{
+ private static final int NUM_MESSAGES = 50;
+ private Connection _con;
+ private Queue _queue;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageConsumer _consumerA;
+ private MessageConsumer _consumerB;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queue = (Queue) _context.lookup("queue");
+
+ //CreateQueue
+ ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ _con = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ _con.start();
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException {
+ _producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerSession = _con.createSession(transacted, mode);
+ _producer = _producerSession.createProducer(_queue);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ try
+ {
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.removeAll();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to clean up");
+ }
+
+ }
+
+ private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException
+ {
+ Message msg;
+ for (int i = 0; i < toConsume; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+ assertEquals("message " + i, ((TextMessage) msg).getText());
+ }
+ }
+
+ private void sendMessages(int totalMessages) throws JMSException
+ {
+ for (int i = 0; i < totalMessages; i++)
+ {
+ _producer.send(_producerSession.createTextMessage("message " + i));
+ }
+ }
+
+ private void testMessageAck(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+ sendMessages(NUM_MESSAGES/2);
+ Thread.sleep(1500);
+ _consumerB = _consumerSession.createConsumer(_queue);
+ sendMessages(NUM_MESSAGES/2);
+ int count = 0;
+ Message msg = _consumerB.receive(100);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ count++;
+ msg = _consumerB.receive(1500);
+ }
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+ _consumerA.close();
+ _consumerB.close();
+ _consumerSession.close();
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName()));
+ }
+
+ public void test2ConsumersAutoAck() throws Exception
+ {
+ testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersClientAck() throws Exception
+ {
+ testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersTx() throws Exception
+ {
+ testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+ }
+
+}