summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java38
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java8
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java122
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java194
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java2
7 files changed, 18 insertions, 357 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6c439993c7..8328baca3f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -244,6 +244,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final AtomicBoolean _recovering = new AtomicBoolean(true);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
+ private final QueueRunner _queueRunner = new QueueRunner(this);
+
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
super(parentsMap(virtualHost), attributes);
@@ -745,7 +747,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
- deliverAsync(consumer);
+ deliverAsync();
return consumer;
@@ -1006,14 +1008,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
checkConsumersNotAheadOfDelivery(entry);
- if (exclusiveSub != null)
- {
- deliverAsync(exclusiveSub);
- }
- else
- {
- deliverAsync();
- }
+ deliverAsync();
}
checkForNotification(entry.getMessage());
@@ -1490,7 +1485,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync(sub);
+ deliverAsync();
}
}
@@ -1859,8 +1854,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
- private final QueueRunner _queueRunner = new QueueRunner(this);
-
public void deliverAsync()
{
_stateChangeCount.incrementAndGet();
@@ -1869,20 +1862,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
- public void deliverAsync(QueueConsumer<?> sub)
- {
- if(_exclusiveSubscriber == null)
- {
- deliverAsync();
- }
- else
- {
- SubFlushRunner flusher = sub.getRunner();
- flusher.execute();
- }
-
- }
-
void flushConsumer(QueueConsumer<?> sub)
{
@@ -2100,10 +2079,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * consumer (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to consumers unsuspending) which are
- * capable of accepting/delivering all messages then these messages would
- * otherwise remain on the queue.
+ * consumer (i.e. asynchronous delivery is required).
*
* processQueue should be running while there are messages on the queue AND
* there are consumers that can deliver them. If there are no
@@ -2412,7 +2388,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync(_sub);
+ deliverAsync();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 02cd7ff56f..a2c275e797 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -44,8 +44,6 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
void queueDeleted();
- SubFlushRunner getRunner();
-
AMQQueue getQueue();
boolean resend(QueueEntry e);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index b33a72be10..c85e4058a1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -86,7 +86,6 @@ class QueueConsumerImpl
}
private final ConsumerTarget _target;
- private final SubFlushRunner _runner = new SubFlushRunner(this);
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
_listener;
private volatile QueueContext _queueContext;
@@ -210,7 +209,7 @@ class QueueConsumerImpl
@Override
public void externalStateChange()
{
- _queue.deliverAsync(this);
+ _queue.deliverAsync();
}
@Override
@@ -324,11 +323,6 @@ class QueueConsumerImpl
return getQueue().resend(entry, this);
}
- public final SubFlushRunner getRunner()
- {
- return _runner;
- }
-
public final long getConsumerNumber()
{
return _consumerNumber;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
deleted file mode 100755
index c860918a0b..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-import java.security.PrivilegedAction;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.security.auth.Subject;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.transport.TransportException;
-
-
-class SubFlushRunner implements Runnable
-{
- private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
-
-
- private final QueueConsumerImpl _sub;
-
- private static int IDLE = 0;
- private static int SCHEDULED = 1;
- private static int RUNNING = 2;
-
-
- private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
-
-
- private final AtomicBoolean _stateChange = new AtomicBoolean();
-
- public SubFlushRunner(QueueConsumerImpl sub)
- {
- _sub = sub;
- }
-
- public void run()
- {
- if(_scheduled.compareAndSet(SCHEDULED, RUNNING))
- {
- Subject.doAs(SecurityManager.getSystemTaskSubject("Sub. Delivery"), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- boolean complete = false;
- _stateChange.set(false);
- try
- {
- complete = getQueue().flushConsumer(_sub, getQueue().getMaxAsyncDeliveries());
- }
- catch (ConnectionScopedRuntimeException | TransportException e)
- {
- final String errorMessage = "Problem during asynchronous delivery by " + toString();
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, e);
- }
- else
- {
- _logger.info(errorMessage + ' ' + e.getMessage());
- }
- }
- finally
- {
- _scheduled.compareAndSet(RUNNING, IDLE);
- if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
- {
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
- {
- getQueue().execute(SubFlushRunner.this);
- }
- }
- }
- return null;
- }
- });
-
- }
- }
-
- private AbstractQueue<?> getQueue()
- {
- return (AbstractQueue<?>) _sub.getQueue();
- }
-
- public String toString()
- {
- return "SubFlushRunner-" + _sub.toLogString();
- }
-
- public void execute()
- {
- _stateChange.set(true);
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
- {
- getQueue().execute(this);
- }
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 9255dbf42e..c1a9240f2c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -261,7 +261,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -274,7 +274,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
queueEntries.get(0).release();
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
4,
@@ -311,7 +311,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(messageA, postEnqueueAction);
int subFlushWaitTime = 150;
- Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
1,
@@ -322,7 +322,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
queueEntries.get(0).release();
- Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed",
@@ -360,7 +360,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -374,7 +374,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
queueEntries.get(2).release();
queueEntries.get(0).release();
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
5,
@@ -417,7 +417,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(messageA, postEnqueueAction);
_queue.enqueue(messageB, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to both after enqueue",
2,
@@ -426,7 +426,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
@@ -645,88 +645,6 @@ abstract class AbstractQueueTestBase extends QpidTestCase
assertEquals("Message ID was wrong", msgID, 10L);
}
-
- /**
- * processQueue() is used when asynchronously delivering messages to
- * consumers which could not be delivered immediately during the
- * enqueue() operation.
- *
- * A defect within the method would mean that delivery of these messages may
- * not occur should the Runner stop before all messages have been processed.
- * Such a defect was discovered when Selectors were used such that one and
- * only one consumer can/will accept any given messages, but multiple
- * consumers are present, and one of the earlier consumers receives
- * more messages than the others.
- *
- * This test is to validate that the processQueue() method is able to
- * correctly deliver all of the messages present for asynchronous delivery
- * to consumers in such a scenario.
- */
- public void testProcessQueueWithUniqueSelectors() throws Exception
- {
- AbstractQueue testQueue = createNonAsyncDeliverQueue();
- testQueue.open();
-
- // retrieve the QueueEntryList the queue creates and insert the test
- // messages, thus avoiding straight-through delivery attempts during
- //enqueue() process.
- QueueEntryList list = testQueue.getEntries();
- assertNotNull("QueueEntryList should have been created", list);
-
- QueueEntry msg1 = list.add(createMessage(1L));
- QueueEntry msg2 = list.add(createMessage(2L));
- QueueEntry msg3 = list.add(createMessage(3L));
- QueueEntry msg4 = list.add(createMessage(4L));
- QueueEntry msg5 = list.add(createMessage(5L));
-
- // Create lists of the entries each consumer should be interested
- // in.Bias over 50% of the messages to the first consumer so that
- // the later consumers reject them and report being done before
- // the first consumer as the processQueue method proceeds.
- List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
- List<String> msgListSub2 = createEntriesList(msg4);
- List<String> msgListSub3 = createEntriesList(msg5);
-
- MockConsumer sub1 = new MockConsumer(msgListSub1);
- MockConsumer sub2 = new MockConsumer(msgListSub2);
- MockConsumer sub3 = new MockConsumer(msgListSub3);
-
- // register the consumers
- testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
- testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
- testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
-
- //check that no messages have been delivered to the
- //consumers during registration
- assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
- assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
- assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
-
- // call processQueue to deliver the messages
- testQueue.processQueue(new QueueRunner(testQueue)
- {
- @Override
- public void run()
- {
- // we don't actually want/need this runner to do any work
- // because we we are already doing it!
- }
- });
-
- // check expected messages delivered to correct consumers
- verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages());
- verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages());
- verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
- }
-
- private AbstractQueue createNonAsyncDeliverQueue()
- {
- return new NonAsyncDeliverQueue(getVirtualHost());
- }
-
/**
* Tests that dequeued message is not present in the list returned form
* {@link AbstractQueue#getMessagesOnTheQueue()}
@@ -1055,16 +973,6 @@ abstract class AbstractQueueTestBase extends QpidTestCase
return entry;
}
- private List<String> createEntriesList(QueueEntry... entries)
- {
- ArrayList<String> entriesList = new ArrayList<String>();
- for (QueueEntry entry : entries)
- {
- entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
- }
- return entriesList;
- }
-
protected void verifyReceivedMessages(List<MessageInstance> expected,
List<MessageInstance> delivered)
{
@@ -1210,90 +1118,4 @@ abstract class AbstractQueueTestBase extends QpidTestCase
return _consumerTarget;
}
- private static class NonAsyncDeliverEntry extends OrderedQueueEntry
- {
-
- public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList)
- {
- super(queueEntryList);
- }
-
- public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
- final ServerMessage message,
- final long entryId)
- {
- super(queueEntryList, message, entryId);
- }
-
- public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message)
- {
- super(queueEntryList, message);
- }
- }
-
- private static class NonAsyncDeliverList extends OrderedQueueEntryList
- {
-
- private static final HeadCreator HEAD_CREATOR =
- new HeadCreator()
- {
-
- @Override
- public NonAsyncDeliverEntry createHead(final QueueEntryList list)
- {
- return new NonAsyncDeliverEntry((NonAsyncDeliverList) list);
- }
- };
-
- public NonAsyncDeliverList(final NonAsyncDeliverQueue queue)
- {
- super(queue, HEAD_CREATOR);
- }
-
- @Override
- protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message)
- {
- return new NonAsyncDeliverEntry(this,message);
- }
- }
-
-
- private static class NonAsyncDeliverQueue extends AbstractQueue<NonAsyncDeliverQueue>
- {
- private QueueEntryList _entries = new NonAsyncDeliverList(this);
-
- public NonAsyncDeliverQueue(VirtualHostImpl vhost)
- {
- super(attributes(), vhost);
- }
-
- @Override
- protected void onOpen()
- {
- super.onOpen();
- }
-
- @Override
- QueueEntryList getEntries()
- {
- return _entries;
- }
-
- private static Map<String,Object> attributes()
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, "test");
- attributes.put(Queue.DURABLE, false);
- attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
- return attributes;
- }
-
- @Override
- public void deliverAsync(QueueConsumer<?> sub)
- {
- // do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new consumers
- }
- }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 7fc36f39bb..004096082a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -180,14 +180,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "test");
// create queue with overridden method deliverAsync
- StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost())
- {
- @Override
- public void deliverAsync(QueueConsumer sub)
- {
- // do nothing
- }
- };
+ StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost());
testQueue.create();
// put messages
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index d593c1f594..78228c209f 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -307,7 +307,7 @@ public class ServerConnectionDelegate extends ServerDelegate
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
// To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
- // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop
+ // that any in-progress delivery (QueueRunner) is completed before the stop
// completes.
stopAllSubscriptions(conn, dtc);
Session ssn = conn.getSession(dtc.getChannel());