summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-27 12:43:04 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-27 12:43:04 +0000
commit980643b9364f2ec7e75f9e4a391754f5db4bc24a (patch)
treea7f43779191f41bc9d8413460c302ff25a67ab76 /java/broker
parent4b1cc6b00ded3584ed2f11431845de09f195ed14 (diff)
downloadqpid-python-980643b9364f2ec7e75f9e4a391754f5db4bc24a.tar.gz
Refactoring updates (job queue changes, enqueue collections..)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@660490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java43
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java351
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java34
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java5
12 files changed, 384 insertions, 156 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 441f88b9b6..d1bea3410b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -248,8 +249,10 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
}
boolean routed = false;
+ Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
+
if (e.binding.matches(headers))
{
if (_logger.isDebugEnabled())
@@ -257,10 +260,12 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
headers + " to " + e.queue.getName());
}
- payload.enqueue(e.queue);
+ queues.add(e.queue);
+
routed = true;
}
}
+ payload.enqueue(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 8d3110ef18..d07501a188 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -262,8 +262,24 @@ public class TopicExchange extends AbstractExchange
_filteredQueues.put(queue,newFilters);
}
- public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
{
+ if(queues == null)
+ {
+ if(_filteredQueues.isEmpty())
+ {
+ return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+ }
+ else
+ {
+ queues = new HashSet<AMQQueue>();
+ }
+ }
+ else if(!(queues instanceof Set))
+ {
+ queues = new HashSet<AMQQueue>(queues);
+ }
+
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
@@ -621,11 +637,11 @@ public class TopicExchange extends AbstractExchange
}
else
{
- Set<AMQQueue> queues = new HashSet<AMQQueue>();
+ Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
- ((TopicExchangeResult)result).processMessage(message, queues);
+ queues = ((TopicExchangeResult)result).processMessage(message, queues);
}
return queues;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 0a6bfb15e6..bdb16d0fcb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -444,7 +444,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public boolean channelAwaitingClosure(int channelId)
{
- return _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 05533e0d2d..0e5e7aa68c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -136,6 +136,11 @@ public class AMQMessage implements Filterable<AMQException>
}
}
+ public void clearStoreContext()
+ {
+ _storeContext = new StoreContext();
+ }
+
public StoreContext getStoreContext()
{
return _storeContext;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 780cd49834..570bd97a28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -58,15 +58,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void unregisterSubscription(final Subscription subscription) throws AMQException;
+
int getConsumerCount();
int getActiveConsumerCount();
boolean isUnused();
-
-
-
boolean isEmpty();
int getMessageCount();
@@ -80,10 +78,27 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
long getOldestMessageArrivalTime();
-
boolean isDeleted();
+ int delete() throws AMQException;
+
+
+ QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+
+ void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+
+ void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
+
+
+
+ boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+
+
+
+ void addQueueDeleteTask(final Task task);
+
+
List<QueueEntry> getMessagesOnTheQueue();
List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
@@ -91,7 +106,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
QueueEntry getMessageOnTheQueue(long messageId);
-
void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
StoreContext storeContext);
@@ -99,9 +113,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
- void quiesce();
- void start();
long getMaximumMessageSize();
@@ -132,27 +144,14 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
- int delete() throws AMQException;
-
-
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
-
- void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
-
- void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
- void deliverAsync();
-
- void addQueueDeleteTask(final Task task);
-
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
void removeExpiredIfNoSubscribers() throws AMQException;
Set<NotificationCheck> getNotificationChecks();
void flushSubscription(final Subscription sub) throws AMQException;
+ void deliverAsync(final Subscription sub);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 68b429efc6..81c8c04d6d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -32,10 +32,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
@@ -198,19 +196,53 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
else
{
+ int offset;
+ final int queueCount = destinationQueues.size();
+ if(queueCount == 1)
+ {
+ offset = 0;
+ }
+ else
+ {
+ offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ if(offset < 0)
+ {
+ offset = -offset;
+ }
+ }
+
+ int i = 0;
for (AMQQueue q : destinationQueues)
{
- // Increment the references to this message for each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
+ if(++i > offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
}
+ i = 0;
+ if(offset != 0)
+ {
+ for (AMQQueue q : destinationQueues)
+ {
+ if(i++ < offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
+ }
+ }
+
}
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
-
+ message.clearStoreContext();
return message;
}
finally
@@ -257,16 +289,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
return _messagePublishInfo.isImmediate();
}
-
- public void enqueue(final AMQQueue q) throws AMQException
- {
- if(_destinationQueues == null)
- {
- _destinationQueues = new ArrayList<AMQQueue>();
- }
- _destinationQueues.add(q);
- }
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index fea1db97b3..d26d6af7b2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -42,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private final AMQMessage _message;
+ private AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -376,7 +376,7 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
return true;
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 16d24e74ee..5baf48245c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -11,6 +11,8 @@ import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.configuration.Configured;
import org.apache.log4j.Logger;
@@ -21,8 +23,6 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -84,7 +84,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
- private boolean _exclusiveSubscriber;
+ private volatile Subscription _exclusiveSubscriber;
private final QueueEntryList _entries;
@@ -116,9 +116,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
+
+
+ private static final int MAX_ASYNC_DELIVERIES = 10;
+
+
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -155,7 +161,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor();
+ _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor();
try
{
@@ -235,11 +243,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// ------ Manage Subscriptions
- public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
+ public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
{
- if(_exclusiveSubscriber)
+ if(isExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
}
@@ -249,7 +257,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new ExistingSubscriptionPreventsExclusive();
}
- _exclusiveSubscriber = exclusive;
+ setExclusiveSubscriber(subscription);
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
@@ -274,7 +282,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void unregisterSubscription(final Subscription subscription) throws AMQException
+ public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
{
if(subscription == null)
{
@@ -289,9 +297,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
subscription.close();
// No longer can the queue have an exclusive consumer
- _exclusiveSubscriber = false;
+ setExclusiveSubscriber(null);
+ QueueEntry lastSeen;
+
+ while((lastSeen = subscription.getLastSeenEntry()) != null)
+ {
+ subscription.setLastSeenEntry(lastSeen, null);
+ }
+
@@ -329,83 +344,84 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry = _entries.add(message);
+ QueueEntry entry;
+ Subscription exclusiveSub = _exclusiveSubscriber;
+ if(exclusiveSub != null)
+ {
+ exclusiveSub.getSendLock();
- /*
+ try
+ {
+ entry = _entries.add(message);
+ deliverToSubscription(exclusiveSub, entry);
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if(nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- while(nextNode != null)
- {
- if(_lastSubscriptionNode.compareAndSet(node, nextNode))
- {
- break;
- }
- else
- {
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if(nextNode == null)
+ // where there is more than one producer there's a reasonable chance that even though there is
+ // no "queueing" we do not deliver because we get an interleving of _entries.add and
+ // deliverToSubscription between threads. Therefore have one more try.
+ if(!(entry.isAcquired() || entry.isDeleted()))
{
- nextNode = _subscriptionList.getHead().getNext();
+ deliverToSubscription(exclusiveSub, entry);
}
}
+ finally
+ {
+ exclusiveSub.releaseSendLock();
+ }
}
+ else
+ {
+ entry = _entries.add(message);
+ /*
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while(!entry.isAcquired() && loops != 0)
- {
+ */
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if(nextNode == null)
{
- loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _subscriptionList.getHead().getNext();
}
- else
+ while(nextNode != null)
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- synchronized(sub.getSendLock())
+ if(_lastSubscriptionNode.compareAndSet(node, nextNode))
{
- if(subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended()
- && sub.isActive())
+ break;
+ }
+ else
+ {
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
+ if(nextNode == null)
{
- if( !sub.wouldSuspend(entry))
- {
- if(!sub.isBrowser() && !entry.acquire(sub))
- {
- sub.restoreCredit(entry);
- }
- else
- {
- QueueEntry queueEntryNode = sub.getLastSeenEntry();
- if(_entries.next(queueEntryNode) == entry)
- {
- sub.setLastSeenEntry(queueEntryNode,entry);
- }
-
- deliverMessage(sub, entry);
-
- }
- }
+ nextNode = _subscriptionList.getHead().getNext();
}
}
}
- nextNode = nextNode.getNext();
- }
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
+
+ while(!entry.isAcquired() && loops != 0)
+ {
+ if(nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
+ }
+ nextNode = nextNode.getNext();
+
+ }
+ }
if(entry.immediateAndNotDelivered())
@@ -413,7 +429,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
- else if(!entry.isAcquired())
+ else if(!(entry.isAcquired() || entry.isDeleted()))
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -435,6 +451,42 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ throws AMQException
+ {
+ sub.getSendLock();
+ try
+ {
+ if(subscriptionReadyAndHasInterest(sub, entry)
+ && !sub.isSuspended()
+ && sub.isActive())
+ {
+ if( !sub.wouldSuspend(entry))
+ {
+ if(!sub.isBrowser() && !entry.acquire(sub))
+ {
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ QueueEntry queueEntryNode = sub.getLastSeenEntry();
+ if(_entries.next(queueEntryNode) == entry)
+ {
+ sub.setLastSeenEntry(queueEntryNode,entry);
+ }
+
+ deliverMessage(sub, entry);
+
+ }
+ }
+ }
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -588,7 +640,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
entry to resend and move back the subscription pointer. */
- synchronized(subscription.getSendLock())
+ subscription.getSendLock();
+ try
{
if(!subscription.isClosed())
{
@@ -600,6 +653,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return false;
}
}
+ finally
+ {
+ subscription.releaseSendLock();
+ }
}
@@ -703,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync();
+ deliverAsync(sub);
}
}
@@ -722,6 +779,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _atomicQueueSize;
}
+ private boolean isExclusiveSubscriber()
+ {
+ return _exclusiveSubscriber != null;
+ }
+
+ private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ {
+ _exclusiveSubscriber = exclusiveSubscriber;
+ }
+
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -999,22 +1066,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void quiesce()
- {
- _quiesced.set(true);
- }
-
- public void start()
- {
- if(_quiesced.compareAndSet(true,false))
- {
- deliverAsync();
- }
- }
-
-
-
-
// ------ Management functions
@@ -1088,6 +1139,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_deleteTaskList.clear();
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
return getMessageCount();
@@ -1098,13 +1150,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_stateChangeCount.incrementAndGet();
- if(_asynchronousRunner.get() == null)
- {
- _asyncDelivery.execute(new Runner());
+ Runner runner = new Runner();
+
+ if(_asynchronousRunner.compareAndSet(null,runner))
+ {
+ _asyncDelivery.execute(runner);
}
}
- private class Runner implements Runnable
+ public void deliverAsync(Subscription sub)
+ {
+ _asyncDelivery.execute(new SubFlushRunner(sub));
+ }
+
+ private class Runner implements ReadWriteRunnable
{
public void run()
{
@@ -1118,21 +1177,77 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+ }
+
+
+ private class SubFlushRunner implements ReadWriteRunnable
+ {
+ private final Subscription _sub;
+
+
+ public SubFlushRunner(Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public void run()
+ {
+ boolean complete = false;
+ try
+ {
+ complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ if(!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
+
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
}
public void flushSubscription(Subscription sub) throws AMQException
{
+ flushSubscription(sub, Long.MAX_VALUE);
+ }
+
+ public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
+ {
boolean atTail = false;
- while(sub.isActive() && !atTail)
+
+ while(!sub.isSuspended() && !atTail && deliveries != 0)
{
- synchronized(sub.getSendLock())
+ sub.getSendLock();
+ try
{
if(sub.isActive())
{
-
QueueEntry node = moveSubscriptionToNextNode(sub);
-
if(!(node.isAcquired() || node.isDeleted()))
{
if(!sub.isSuspended())
@@ -1148,6 +1263,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
+ deliveries--;
deliverMessage(sub, node);
if(sub.isBrowser())
@@ -1159,8 +1275,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
}
-
-
}
}
@@ -1180,13 +1294,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
}
+
}
atTail = (_entries.next(node) == null);
}
}
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
+
+ if(!isExclusiveSubscriber())
+ {
+ advanceAllSubscriptions();
+ }
+
+ return atTail;
+ }
+
+ protected void advanceAllSubscriptions() throws AMQException
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ while(subscriberIter.advance())
+ {
+ SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
+ Subscription sub = subNode.getSubscription();
+ moveSubscriptionToNextNode(sub);
+ }
}
private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
@@ -1227,8 +1364,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean deliveryIncomplete = true;
int extraLoops = 1;
+ int deliveries = MAX_ASYNC_DELIVERIES;
+
+ _asynchronousRunner.compareAndSet(runner,null);
- while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+ while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
{
// we want to have one extra loop after the every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1251,17 +1391,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
Subscription sub = subscriptionIter.getNode().getSubscription();
if(sub != null)
{
- synchronized(sub.getSendLock())
+ sub.getSendLock();
+ try
{
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+
if(sub.isActive())
{
boolean advanced = false;
+ boolean subActive = false;
- QueueEntry node = moveSubscriptionToNextNode(sub);
if(!(node.isAcquired() || node.isDeleted()))
{
if(!sub.isSuspended())
{
+ subActive = true;
if(sub.hasInterest(node))
{
if(!sub.wouldSuspend(node))
@@ -1274,6 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
else
{
deliverMessage(sub, node);
+ deliveries--;
if(sub.isBrowser())
{
@@ -1309,7 +1454,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
final boolean atTail = (_entries.next(node) == null);
- done = done && atTail;
+ done = done && (!subActive || atTail);
if(atTail && !advanced && sub.isAutoClose())
{
@@ -1322,6 +1467,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
if(done)
{
@@ -1346,6 +1495,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
+ {
+ _asyncDelivery.execute(runner);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index acbeae0f40..537966e3aa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -28,7 +28,6 @@ import org.apache.qpid.server.queue.QueueEntry;
public interface Subscription
{
- boolean isActive();
public static enum State
@@ -75,7 +74,7 @@ public interface Subscription
boolean wouldSuspend(QueueEntry msg);
Object getSendLock();
-
+ void releaseSendLock();
void resend(final QueueEntry entry) throws AMQException;
@@ -87,4 +86,9 @@ public interface Subscription
boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+
+ boolean isActive();
+
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 653c7de514..8e124c8b0c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -62,6 +65,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final RecordDeliveryMethod _recordMethod;
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final Lock _stateChangeLock;
+ private final Lock _stateChangeExclusiveLock;
+
static final class BrowserSubscription extends SubscriptionImpl
@@ -254,7 +260,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
private AMQQueue _queue;
- private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
@@ -280,7 +287,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_deliveryMethod = deliveryMethod;
_recordMethod = recordMethod;
-
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ _stateChangeLock = readWriteLock.readLock();
+ _stateChangeExclusiveLock = readWriteLock.writeLock();
if (arguments != null)
{
@@ -334,7 +343,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean isSuspended()
{
- return !isActive() || _channel.isSuspended() || _sendLock.get();
+ return !isActive() || _channel.isSuspended() || _deleted.get();
}
/**
@@ -344,7 +353,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
*/
public void queueDeleted(AMQQueue queue)
{
- _sendLock.set(true);
+ _deleted.set(true);
// _channel.queueDeleted(queue);
}
@@ -435,7 +444,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
boolean closed = false;
State state = getState();
- synchronized (_sendLock)
+
+ _stateChangeExclusiveLock.lock();
+ try
{
while(!closed && state != State.CLOSED)
{
@@ -451,6 +462,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
_creditManager.removeListener(this);
}
+ finally
+ {
+ _stateChangeExclusiveLock.unlock();
+ }
+
if (closed)
{
@@ -481,7 +497,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public Object getSendLock()
{
- return _sendLock;
+ _stateChangeLock.lock();
+ return _deleted;
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
}
public void resend(final QueueEntry entry) throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 9924d1c770..ca614e053a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -44,6 +44,7 @@ import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
import java.util.LinkedList;
+import java.util.Collections;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -303,7 +304,7 @@ public class AMQQueueAlertTest extends TestCase
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
- messages[i].enqueue(_queue);
+ messages[i].enqueue(Collections.singleton(_queue));
messages[i].routingComplete(_messageStore, new MessageHandleFactory());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 98f78e3d69..bf0a8a6d90 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -48,6 +48,7 @@ import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
+import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -216,7 +217,7 @@ public class AMQQueueMBeanTest extends TestCase
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- msg.enqueue(_queue);
+ msg.enqueue(Collections.singleton(_queue));
msg.routingComplete(_messageStore, new MessageHandleFactory());
msg.addContentBodyFrame(new ContentChunk()
@@ -318,7 +319,7 @@ public class AMQQueueMBeanTest extends TestCase
for (int i = 0; i < messageCount; i++)
{
IncomingMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(_queue);
+ currentMessage.enqueue(Collections.singleton(_queue));
// route header
currentMessage.routingComplete(_messageStore, new MessageHandleFactory());