diff options
author | Alan Conway <aconway@apache.org> | 2011-04-04 19:19:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-04-04 19:19:28 +0000 |
commit | 4768658bab11f1a9f566f785fba33f6790c0b424 (patch) | |
tree | 9cdd91129e46ed781981419b19eb0f7f262f0d2a | |
parent | 6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea (diff) | |
download | qpid-python-4768658bab11f1a9f566f785fba33f6790c0b424.tar.gz |
Merge branch 'trunk' into qpid-2920
Conflicts:
qpid/cpp/src/tests/qpid-cluster-benchmark
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1088742 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 45 insertions, 137 deletions
diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp index 38cb8043c9..cd6735328f 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -43,7 +43,6 @@ void RecoveredDequeue::commit() throw() void RecoveredDequeue::rollback() throw() { - msg->enqueueComplete(); queue->process(msg); } diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp index 6263c63e3d..6d2eaee6c4 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -36,7 +36,6 @@ bool RecoveredEnqueue::prepare(TransactionContext*) throw(){ } void RecoveredEnqueue::commit() throw(){ - msg->enqueueComplete(); queue->process(msg); } diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 9aae39ccc3..9db366fd20 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -258,7 +258,6 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) { - msg->enqueueComplete(); // recoved nmessage to enqueued in store already buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); } diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp index 36a451e62c..9c2cf4a467 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.cpp +++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp @@ -90,14 +90,7 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue) { - if (!queue->enqueue(ctxt, msg)){ - /** - * if not store then mark message for ack and deleivery once - * commit happens, as async IO will never set it when no store - * exists - */ - msg->enqueueComplete(); - } + queue->enqueue(ctxt, msg); } TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){} diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 9d735c20e3..ff787a46dd 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -22,14 +22,13 @@ # Default values PORT="5672" -BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list -BROKER=`echo $HOSTS | awk '{print $1}'` # First broker COUNT=10000 FLOW=100 # Flow control limit on queue depth for latency. REPEAT=10 -SCALE=10 +QUEUES=4 +CLIENTS=3 -while getopts "p:c:f:r:t:b:" opt; do +while getopts "p:c:f:r:t:b:q:c" opt; do case $opt in p) PORT=$OPTARG;; c) COUNT=$OPTARG;; @@ -37,17 +36,22 @@ while getopts "p:c:f:r:t:b:" opt; do r) REPEAT=$OPTARG;; s) SCALE=$OPTARG;; b) BROKERS=$OPTARG;; + q) QUEUES=$OPTARG;; + c) CLIENTS=$OPTARG;; *) echo "Unknown option"; exit 1;; esac done +BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list +BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker + run_test() { echo $*; shift; "$@"; echo; echo; echo; } # Multiple pubs/subs connect via multiple brokers (active-active) -run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT +run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT # Multiple pubs/subs connect via single broker (active-passive) -run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT +run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT # Latency run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW diff --git a/qpid/doc/dev-readme/QPID-Component-README.odg b/qpid/doc/dev-readme/QPID-Component-README.odg Binary files differindex 40f2972d88..217a6d68d8 100644 --- a/qpid/doc/dev-readme/QPID-Component-README.odg +++ b/qpid/doc/dev-readme/QPID-Component-README.odg diff --git a/qpid/doc/dev-readme/QPID-Component-README.pdf b/qpid/doc/dev-readme/QPID-Component-README.pdf Binary files differindex 8863883df9..3012372889 100644 --- a/qpid/doc/dev-readme/QPID-Component-README.pdf +++ b/qpid/doc/dev-readme/QPID-Component-README.pdf diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index de9dc42de8..9b9de8333b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,16 +21,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfig; -import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.management.Managable; @@ -108,23 +105,16 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer boolean isDeleted(); - int delete() throws AMQException; - void requeue(QueueEntry entry); - void requeue(QueueEntryImpl storeContext, Subscription subscription); - void dequeue(QueueEntry entry, Subscription sub); void decrementUnackedMsgCount(); - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); void removeQueueDeleteTask(final Task task); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index edd1e0bdc3..79ede2694e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -195,8 +195,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable boolean isRejectedBy(Subscription subscription); - void requeue(Subscription subscription); - void dequeue(); void dispose(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 1ba4f4d89b..809ba3277e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -358,15 +358,6 @@ public class QueueEntryImpl implements QueueEntry } } - public void requeue(Subscription subscription) - { - getQueue().requeue(this, subscription); - if(_stateChangeListeners != null) - { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); - } - } - public void dequeue() { EntryState state = _state; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 44b7c95535..7e1d57e205 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -27,13 +27,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.queue.QueueRunner; import org.apache.qpid.server.queue.SimpleAMQQueue; - +/** + * QueueRunners are Runnables used to process a queue when requiring + * asynchronous message delivery to subscriptions, which is necessary + * when straight-through delivery of a message to a subscription isn't + * possible during the enqueue operation. + */ public class QueueRunner implements ReadWriteRunnable { private static final Logger _logger = Logger.getLogger(QueueRunner.class); - private String _name; - private SimpleAMQQueue _queue; + private final String _name; + private final SimpleAMQQueue _queue; public QueueRunner(SimpleAMQQueue queue, long count) { @@ -53,7 +58,7 @@ public class QueueRunner implements ReadWriteRunnable } catch (AMQException e) { - _logger.error(e); + _logger.error("Exception during asynchronous delivery by " + _name, e); } finally { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4890c00047..b02d03a1ad 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -805,24 +805,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void requeue(QueueEntryImpl entry, Subscription subscription) - { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance()) - { - Subscription sub = subscriberIter.getNode().getSubscription(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues() && (!sub.acquires() && sub == subscription)) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - } - public void dequeue(QueueEntry entry, Subscription sub) { decrementQueueCount(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 41ca751684..abe2d1728f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -227,10 +227,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that a re-queued message is resent to the subscriber. Verifies also that the + * Tests that a released queue entry is resent to the subscriber. Verifies also that the * QueueContext._releasedEntry is reset to null after the entry has been reset. */ - public void testRequeuedMessageIsResentToSubscriber() throws Exception + public void testReleasedMessageIsResentToSubscriber() throws Exception { _queue.registerSubscription(_subscription, false); @@ -253,19 +253,18 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - /* Now requeue the first message only */ + /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); - _queue.requeue(queueEntries.get(0)); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); @@ -275,11 +274,11 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that a re-queued message that becomes expired is not resent to the subscriber. + * Tests that a released message that becomes expired is not resent to the subscriber. * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. */ - public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception + public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); @@ -301,17 +300,16 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageA, postEnqueueAction); int subFlushWaitTime = 150; - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - /* Wait a little more to be sure that message will have expired, then requeue it */ + /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); queueEntries.get(0).release(); - _queue.requeue(queueEntries.get(0)); - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); @@ -321,12 +319,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that if a client requeues messages 'out of order' (the order + * Tests that if a client releases entries 'out of order' (the order * used by QueueEntryImpl.compareTo) that messages are still resent * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()} * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. */ - public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception + public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); @@ -349,21 +347,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - /* Now requeue the third and first message only */ + /* Now release the third and first message only, causing it to be requeued */ queueEntries.get(2).release(); queueEntries.get(0).release(); - _queue.requeue(queueEntries.get(2)); - _queue.requeue(queueEntries.get(0)); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); @@ -374,10 +370,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** - * Tests a requeue for a queue with multiple subscriptions. Verifies that a + * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a * requeue resends a message to a <i>single</i> subscriber. */ - public void testRequeueForQueueWithMultipleSubscriptions() throws Exception + public void testReleaseForQueueWithMultipleSubscriptions() throws Exception { MockSubscription subscription1 = new MockSubscription(); MockSubscription subscription2 = new MockSubscription(); @@ -402,66 +398,16 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); - assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); - /* Now requeue a message (for any subscription) */ + /* Now release the first message only, causing it to be requeued */ + queueEntries.get(0).release(); - queueEntries.get(0).release(); - _queue.requeue((QueueEntryImpl)queueEntries.get(0)); - - Thread.sleep(150); // Work done by SubFlushRunner Thread - - assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); - } - - /** - * Tests a requeue for a queue with multiple subscriptions. Verifies that a - * subscriber specific requeue resends the message to <i>that</i> subscriber. - */ - public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception - { - MockSubscription subscription1 = new MockSubscription(); - MockSubscription subscription2 = new MockSubscription(); - - _queue.registerSubscription(subscription1, false); - _queue.registerSubscription(subscription2, false); - - final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() - { - public void onEnqueue(QueueEntry entry) - { - queueEntries.add(entry); - } - }; - - AMQMessage messageA = createMessage(new Long(24)); - AMQMessage messageB = createMessage(new Long(25)); - - /* Enqueue two messages */ - - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - - Thread.sleep(150); // Work done by SubFlushRunner Thread - - assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); - assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); - - /* Now requeue a message (for first subscription) */ - - queueEntries.get(0).release(); - _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1); - - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size()); - assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 891819c227..d50c9e16fe 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -217,6 +217,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void resubscribeSessions() throws JMSException, AMQException, FailoverException { + _logger.info("Resuming connection"); + getQpidConnection().resume(); List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size())); for (AMQSession s : sessions) |