summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-04 19:19:28 +0000
committerAlan Conway <aconway@apache.org>2011-04-04 19:19:28 +0000
commit4768658bab11f1a9f566f785fba33f6790c0b424 (patch)
tree9cdd91129e46ed781981419b19eb0f7f262f0d2a
parent6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea (diff)
downloadqpid-python-4768658bab11f1a9f566f785fba33f6790c0b424.tar.gz
Merge branch 'trunk' into qpid-2920
Conflicts: qpid/cpp/src/tests/qpid-cluster-benchmark git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1088742 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/TxPublish.cpp9
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark16
-rw-r--r--qpid/doc/dev-readme/QPID-Component-README.odgbin14069 -> 12661 bytes
-rw-r--r--qpid/doc/dev-readme/QPID-Component-README.pdfbin39535 -> 38097 bytes
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java100
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java2
14 files changed, 45 insertions, 137 deletions
diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
index 38cb8043c9..cd6735328f 100644
--- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
@@ -43,7 +43,6 @@ void RecoveredDequeue::commit() throw()
void RecoveredDequeue::rollback() throw()
{
- msg->enqueueComplete();
queue->process(msg);
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
index 6263c63e3d..6d2eaee6c4 100644
--- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
@@ -36,7 +36,6 @@ bool RecoveredEnqueue::prepare(TransactionContext*) throw(){
}
void RecoveredEnqueue::commit() throw(){
- msg->enqueueComplete();
queue->process(msg);
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 9aae39ccc3..9db366fd20 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -258,7 +258,6 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared
void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
{
- msg->enqueueComplete(); // recoved nmessage to enqueued in store already
buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
}
diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp
index 36a451e62c..9c2cf4a467 100644
--- a/qpid/cpp/src/qpid/broker/TxPublish.cpp
+++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp
@@ -90,14 +90,7 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
{
- if (!queue->enqueue(ctxt, msg)){
- /**
- * if not store then mark message for ack and deleivery once
- * commit happens, as async IO will never set it when no store
- * exists
- */
- msg->enqueueComplete();
- }
+ queue->enqueue(ctxt, msg);
}
TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 9d735c20e3..ff787a46dd 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -22,14 +22,13 @@
# Default values
PORT="5672"
-BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list
-BROKER=`echo $HOSTS | awk '{print $1}'` # First broker
COUNT=10000
FLOW=100 # Flow control limit on queue depth for latency.
REPEAT=10
-SCALE=10
+QUEUES=4
+CLIENTS=3
-while getopts "p:c:f:r:t:b:" opt; do
+while getopts "p:c:f:r:t:b:q:c" opt; do
case $opt in
p) PORT=$OPTARG;;
c) COUNT=$OPTARG;;
@@ -37,17 +36,22 @@ while getopts "p:c:f:r:t:b:" opt; do
r) REPEAT=$OPTARG;;
s) SCALE=$OPTARG;;
b) BROKERS=$OPTARG;;
+ q) QUEUES=$OPTARG;;
+ c) CLIENTS=$OPTARG;;
*) echo "Unknown option"; exit 1;;
esac
done
+BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list
+BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker
+
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
+run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
# Multiple pubs/subs connect via single broker (active-passive)
-run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
+run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
# Latency
run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
diff --git a/qpid/doc/dev-readme/QPID-Component-README.odg b/qpid/doc/dev-readme/QPID-Component-README.odg
index 40f2972d88..217a6d68d8 100644
--- a/qpid/doc/dev-readme/QPID-Component-README.odg
+++ b/qpid/doc/dev-readme/QPID-Component-README.odg
Binary files differ
diff --git a/qpid/doc/dev-readme/QPID-Component-README.pdf b/qpid/doc/dev-readme/QPID-Component-README.pdf
index 8863883df9..3012372889 100644
--- a/qpid/doc/dev-readme/QPID-Component-README.pdf
+++ b/qpid/doc/dev-readme/QPID-Component-README.pdf
Binary files differ
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index de9dc42de8..9b9de8333b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,16 +21,13 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfig;
-import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.management.Managable;
@@ -108,23 +105,16 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
boolean isDeleted();
-
int delete() throws AMQException;
-
void requeue(QueueEntry entry);
- void requeue(QueueEntryImpl storeContext, Subscription subscription);
-
void dequeue(QueueEntry entry, Subscription sub);
void decrementUnackedMsgCount();
-
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
-
void addQueueDeleteTask(final Task task);
void removeQueueDeleteTask(final Task task);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index edd1e0bdc3..79ede2694e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -195,8 +195,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
boolean isRejectedBy(Subscription subscription);
- void requeue(Subscription subscription);
-
void dequeue();
void dispose();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 1ba4f4d89b..809ba3277e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -358,15 +358,6 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void requeue(Subscription subscription)
- {
- getQueue().requeue(this, subscription);
- if(_stateChangeListeners != null)
- {
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
- }
- }
-
public void dequeue()
{
EntryState state = _state;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 44b7c95535..7e1d57e205 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -27,13 +27,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.QueueRunner;
import org.apache.qpid.server.queue.SimpleAMQQueue;
-
+/**
+ * QueueRunners are Runnables used to process a queue when requiring
+ * asynchronous message delivery to subscriptions, which is necessary
+ * when straight-through delivery of a message to a subscription isn't
+ * possible during the enqueue operation.
+ */
public class QueueRunner implements ReadWriteRunnable
{
private static final Logger _logger = Logger.getLogger(QueueRunner.class);
- private String _name;
- private SimpleAMQQueue _queue;
+ private final String _name;
+ private final SimpleAMQQueue _queue;
public QueueRunner(SimpleAMQQueue queue, long count)
{
@@ -53,7 +58,7 @@ public class QueueRunner implements ReadWriteRunnable
}
catch (AMQException e)
{
- _logger.error(e);
+ _logger.error("Exception during asynchronous delivery by " + _name, e);
}
finally
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 4890c00047..b02d03a1ad 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -805,24 +805,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void requeue(QueueEntryImpl entry, Subscription subscription)
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
-
- deliverAsync();
- }
-
public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 41ca751684..abe2d1728f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -227,10 +227,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
/**
- * Tests that a re-queued message is resent to the subscriber. Verifies also that the
+ * Tests that a released queue entry is resent to the subscriber. Verifies also that the
* QueueContext._releasedEntry is reset to null after the entry has been reset.
*/
- public void testRequeuedMessageIsResentToSubscriber() throws Exception
+ public void testReleasedMessageIsResentToSubscriber() throws Exception
{
_queue.registerSubscription(_subscription, false);
@@ -253,19 +253,18 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner Thread
+ Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
- /* Now requeue the first message only */
+ /* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
- _queue.requeue(queueEntries.get(0));
- Thread.sleep(150); // Work done by SubFlushRunner Thread
+ Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
@@ -275,11 +274,11 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
/**
- * Tests that a re-queued message that becomes expired is not resent to the subscriber.
+ * Tests that a released message that becomes expired is not resent to the subscriber.
* This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries.
* Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset.
*/
- public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception
+ public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
{
_queue.registerSubscription(_subscription, false);
@@ -301,17 +300,16 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.enqueue(messageA, postEnqueueAction);
int subFlushWaitTime = 150;
- Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread
+ Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- /* Wait a little more to be sure that message will have expired, then requeue it */
+ /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
queueEntries.get(0).release();
- _queue.requeue(queueEntries.get(0));
- Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread
+ Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
@@ -321,12 +319,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
/**
- * Tests that if a client requeues messages 'out of order' (the order
+ * Tests that if a client releases entries 'out of order' (the order
* used by QueueEntryImpl.compareTo) that messages are still resent
* successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()}
* can correctly move the _releasedEntry to an earlier position in the QueueEntry list.
*/
- public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception
+ public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception
{
_queue.registerSubscription(_subscription, false);
@@ -349,21 +347,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner Thread
+ Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
- /* Now requeue the third and first message only */
+ /* Now release the third and first message only, causing it to be requeued */
queueEntries.get(2).release();
queueEntries.get(0).release();
- _queue.requeue(queueEntries.get(2));
- _queue.requeue(queueEntries.get(0));
- Thread.sleep(150); // Work done by SubFlushRunner Thread
+ Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
@@ -374,10 +370,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
- * Tests a requeue for a queue with multiple subscriptions. Verifies that a
+ * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
- public void testRequeueForQueueWithMultipleSubscriptions() throws Exception
+ public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
{
MockSubscription subscription1 = new MockSubscription();
MockSubscription subscription2 = new MockSubscription();
@@ -402,66 +398,16 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.enqueue(messageA, postEnqueueAction);
_queue.enqueue(messageB, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner Thread
+ Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size());
- assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size());
- /* Now requeue a message (for any subscription) */
+ /* Now release the first message only, causing it to be requeued */
+ queueEntries.get(0).release();
- queueEntries.get(0).release();
- _queue.requeue((QueueEntryImpl)queueEntries.get(0));
-
- Thread.sleep(150); // Work done by SubFlushRunner Thread
-
- assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
- }
-
- /**
- * Tests a requeue for a queue with multiple subscriptions. Verifies that a
- * subscriber specific requeue resends the message to <i>that</i> subscriber.
- */
- public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception
- {
- MockSubscription subscription1 = new MockSubscription();
- MockSubscription subscription2 = new MockSubscription();
-
- _queue.registerSubscription(subscription1, false);
- _queue.registerSubscription(subscription2, false);
-
- final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
- {
- public void onEnqueue(QueueEntry entry)
- {
- queueEntries.add(entry);
- }
- };
-
- AMQMessage messageA = createMessage(new Long(24));
- AMQMessage messageB = createMessage(new Long(25));
-
- /* Enqueue two messages */
-
- _queue.enqueue(messageA, postEnqueueAction);
- _queue.enqueue(messageB, postEnqueueAction);
-
- Thread.sleep(150); // Work done by SubFlushRunner Thread
-
- assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size());
- assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size());
-
- /* Now requeue a message (for first subscription) */
-
- queueEntries.get(0).release();
- _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1);
-
- Thread.sleep(150); // Work done by SubFlushRunner Thread
+ Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size());
- assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 891819c227..d50c9e16fe 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -217,6 +217,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
+ _logger.info("Resuming connection");
+ getQpidConnection().resume();
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
_logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
for (AMQSession s : sessions)