diff options
Diffstat (limited to 'qpid/java')
7 files changed, 18 insertions, 357 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 6c439993c7..8328baca3f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -244,6 +244,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final AtomicBoolean _recovering = new AtomicBoolean(true); private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); + private final QueueRunner _queueRunner = new QueueRunner(this); + protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { super(parentsMap(virtualHost), attributes); @@ -745,7 +747,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> childAdded(consumer); consumer.addChangeListener(_deletedChildListener); - deliverAsync(consumer); + deliverAsync(); return consumer; @@ -1006,14 +1008,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { checkConsumersNotAheadOfDelivery(entry); - if (exclusiveSub != null) - { - deliverAsync(exclusiveSub); - } - else - { - deliverAsync(); - } + deliverAsync(); } checkForNotification(entry.getMessage()); @@ -1490,7 +1485,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _activeSubscriberCount.incrementAndGet(); } - deliverAsync(sub); + deliverAsync(); } } @@ -1859,8 +1854,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } - private final QueueRunner _queueRunner = new QueueRunner(this); - public void deliverAsync() { _stateChangeCount.incrementAndGet(); @@ -1869,20 +1862,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } - public void deliverAsync(QueueConsumer<?> sub) - { - if(_exclusiveSubscriber == null) - { - deliverAsync(); - } - else - { - SubFlushRunner flusher = sub.getRunner(); - flusher.execute(); - } - - } - void flushConsumer(QueueConsumer<?> sub) { @@ -2100,10 +2079,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> * * A queue Runner is started whenever a state change occurs, e.g when a new * message arrives on the queue and cannot be immediately delivered to a - * consumer (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to consumers unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. + * consumer (i.e. asynchronous delivery is required). * * processQueue should be running while there are messages on the queue AND * there are consumers that can deliver them. If there are no @@ -2412,7 +2388,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); - deliverAsync(_sub); + deliverAsync(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 02cd7ff56f..a2c275e797 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -44,8 +44,6 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, void queueDeleted(); - SubFlushRunner getRunner(); - AMQQueue getQueue(); boolean resend(QueueEntry e); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index b33a72be10..c85e4058a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -86,7 +86,6 @@ class QueueConsumerImpl } private final ConsumerTarget _target; - private final SubFlushRunner _runner = new SubFlushRunner(this); private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener; private volatile QueueContext _queueContext; @@ -210,7 +209,7 @@ class QueueConsumerImpl @Override public void externalStateChange() { - _queue.deliverAsync(this); + _queue.deliverAsync(); } @Override @@ -324,11 +323,6 @@ class QueueConsumerImpl return getQueue().resend(entry, this); } - public final SubFlushRunner getRunner() - { - return _runner; - } - public final long getConsumerNumber() { return _consumerNumber; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java deleted file mode 100755 index c860918a0b..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - - -import java.security.PrivilegedAction; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.security.auth.Subject; - -import org.apache.log4j.Logger; - -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.transport.TransportException; - - -class SubFlushRunner implements Runnable -{ - private static final Logger _logger = Logger.getLogger(SubFlushRunner.class); - - - private final QueueConsumerImpl _sub; - - private static int IDLE = 0; - private static int SCHEDULED = 1; - private static int RUNNING = 2; - - - private final AtomicInteger _scheduled = new AtomicInteger(IDLE); - - - private final AtomicBoolean _stateChange = new AtomicBoolean(); - - public SubFlushRunner(QueueConsumerImpl sub) - { - _sub = sub; - } - - public void run() - { - if(_scheduled.compareAndSet(SCHEDULED, RUNNING)) - { - Subject.doAs(SecurityManager.getSystemTaskSubject("Sub. Delivery"), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - boolean complete = false; - _stateChange.set(false); - try - { - complete = getQueue().flushConsumer(_sub, getQueue().getMaxAsyncDeliveries()); - } - catch (ConnectionScopedRuntimeException | TransportException e) - { - final String errorMessage = "Problem during asynchronous delivery by " + toString(); - if(_logger.isDebugEnabled()) - { - _logger.debug(errorMessage, e); - } - else - { - _logger.info(errorMessage + ' ' + e.getMessage()); - } - } - finally - { - _scheduled.compareAndSet(RUNNING, IDLE); - if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended()) - { - if(_scheduled.compareAndSet(IDLE,SCHEDULED)) - { - getQueue().execute(SubFlushRunner.this); - } - } - } - return null; - } - }); - - } - } - - private AbstractQueue<?> getQueue() - { - return (AbstractQueue<?>) _sub.getQueue(); - } - - public String toString() - { - return "SubFlushRunner-" + _sub.toLogString(); - } - - public void execute() - { - _stateChange.set(true); - if(_scheduled.compareAndSet(IDLE,SCHEDULED)) - { - getQueue().execute(this); - } - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 9255dbf42e..c1a9240f2c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -261,7 +261,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to consumer", 3, @@ -274,7 +274,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase queueEntries.get(0).release(); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to consumer", 4, @@ -311,7 +311,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageA, postEnqueueAction); int subFlushWaitTime = 150; - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to consumer", 1, @@ -322,7 +322,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); queueEntries.get(0).release(); - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", @@ -360,7 +360,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to consumer", 3, @@ -374,7 +374,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase queueEntries.get(2).release(); queueEntries.get(0).release(); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to consumer", 5, @@ -417,7 +417,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to both after enqueue", 2, @@ -426,7 +426,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + Thread.sleep(150); // Work done by QueueRunner Thread assertEquals("Unexpected total number of messages sent to both consumers after release", 3, @@ -645,88 +645,6 @@ abstract class AbstractQueueTestBase extends QpidTestCase assertEquals("Message ID was wrong", msgID, 10L); } - - /** - * processQueue() is used when asynchronously delivering messages to - * consumers which could not be delivered immediately during the - * enqueue() operation. - * - * A defect within the method would mean that delivery of these messages may - * not occur should the Runner stop before all messages have been processed. - * Such a defect was discovered when Selectors were used such that one and - * only one consumer can/will accept any given messages, but multiple - * consumers are present, and one of the earlier consumers receives - * more messages than the others. - * - * This test is to validate that the processQueue() method is able to - * correctly deliver all of the messages present for asynchronous delivery - * to consumers in such a scenario. - */ - public void testProcessQueueWithUniqueSelectors() throws Exception - { - AbstractQueue testQueue = createNonAsyncDeliverQueue(); - testQueue.open(); - - // retrieve the QueueEntryList the queue creates and insert the test - // messages, thus avoiding straight-through delivery attempts during - //enqueue() process. - QueueEntryList list = testQueue.getEntries(); - assertNotNull("QueueEntryList should have been created", list); - - QueueEntry msg1 = list.add(createMessage(1L)); - QueueEntry msg2 = list.add(createMessage(2L)); - QueueEntry msg3 = list.add(createMessage(3L)); - QueueEntry msg4 = list.add(createMessage(4L)); - QueueEntry msg5 = list.add(createMessage(5L)); - - // Create lists of the entries each consumer should be interested - // in.Bias over 50% of the messages to the first consumer so that - // the later consumers reject them and report being done before - // the first consumer as the processQueue method proceeds. - List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3); - List<String> msgListSub2 = createEntriesList(msg4); - List<String> msgListSub3 = createEntriesList(msg5); - - MockConsumer sub1 = new MockConsumer(msgListSub1); - MockConsumer sub2 = new MockConsumer(msgListSub2); - MockConsumer sub3 = new MockConsumer(msgListSub3); - - // register the consumers - testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); - testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); - testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); - - //check that no messages have been delivered to the - //consumers during registration - assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); - assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); - assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); - - // call processQueue to deliver the messages - testQueue.processQueue(new QueueRunner(testQueue) - { - @Override - public void run() - { - // we don't actually want/need this runner to do any work - // because we we are already doing it! - } - }); - - // check expected messages delivered to correct consumers - verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages()); - verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages()); - verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages()); - } - - private AbstractQueue createNonAsyncDeliverQueue() - { - return new NonAsyncDeliverQueue(getVirtualHost()); - } - /** * Tests that dequeued message is not present in the list returned form * {@link AbstractQueue#getMessagesOnTheQueue()} @@ -1055,16 +973,6 @@ abstract class AbstractQueueTestBase extends QpidTestCase return entry; } - private List<String> createEntriesList(QueueEntry... entries) - { - ArrayList<String> entriesList = new ArrayList<String>(); - for (QueueEntry entry : entries) - { - entriesList.add(entry.getMessage().getMessageHeader().getMessageId()); - } - return entriesList; - } - protected void verifyReceivedMessages(List<MessageInstance> expected, List<MessageInstance> delivered) { @@ -1210,90 +1118,4 @@ abstract class AbstractQueueTestBase extends QpidTestCase return _consumerTarget; } - private static class NonAsyncDeliverEntry extends OrderedQueueEntry - { - - public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList) - { - super(queueEntryList); - } - - public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, - final ServerMessage message, - final long entryId) - { - super(queueEntryList, message, entryId); - } - - public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message) - { - super(queueEntryList, message); - } - } - - private static class NonAsyncDeliverList extends OrderedQueueEntryList - { - - private static final HeadCreator HEAD_CREATOR = - new HeadCreator() - { - - @Override - public NonAsyncDeliverEntry createHead(final QueueEntryList list) - { - return new NonAsyncDeliverEntry((NonAsyncDeliverList) list); - } - }; - - public NonAsyncDeliverList(final NonAsyncDeliverQueue queue) - { - super(queue, HEAD_CREATOR); - } - - @Override - protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message) - { - return new NonAsyncDeliverEntry(this,message); - } - } - - - private static class NonAsyncDeliverQueue extends AbstractQueue<NonAsyncDeliverQueue> - { - private QueueEntryList _entries = new NonAsyncDeliverList(this); - - public NonAsyncDeliverQueue(VirtualHostImpl vhost) - { - super(attributes(), vhost); - } - - @Override - protected void onOpen() - { - super.onOpen(); - } - - @Override - QueueEntryList getEntries() - { - return _entries; - } - - private static Map<String,Object> attributes() - { - Map<String,Object> attributes = new HashMap<String, Object>(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, "test"); - attributes.put(Queue.DURABLE, false); - attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - return attributes; - } - - @Override - public void deliverAsync(QueueConsumer<?> sub) - { - // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new consumers - } - } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 7fc36f39bb..004096082a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -180,14 +180,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "test"); // create queue with overridden method deliverAsync - StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost()) - { - @Override - public void deliverAsync(QueueConsumer sub) - { - // do nothing - } - }; + StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost()); testQueue.create(); // put messages diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index d593c1f594..78228c209f 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -307,7 +307,7 @@ public class ServerConnectionDelegate extends ServerDelegate @Override public void sessionDetach(Connection conn, SessionDetach dtc) { // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures - // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop + // that any in-progress delivery (QueueRunner) is completed before the stop // completes. stopAllSubscriptions(conn, dtc); Session ssn = conn.getSession(dtc.getChannel()); |
