From cbee9e6623bd4c1a9790613c39517a600ca289d6 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Fri, 19 Jan 2007 10:35:21 +0000 Subject: QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@497770 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 83 ++++- .../qpid/server/ack/UnacknowledgedMessageMap.java | 2 + .../server/ack/UnacknowledgedMessageMapImpl.java | 28 +- .../server/exchange/DefaultExchangeRegistry.java | 24 +- .../qpid/server/exchange/ExchangeRegistry.java | 2 + .../server/handler/BasicConsumeMethodHandler.java | 44 +-- .../server/handler/BasicPublishMethodHandler.java | 17 +- .../qpid/server/handler/BasicQosHandler.java | 2 + .../server/handler/BasicRecoverMethodHandler.java | 4 +- .../handler/ConnectionOpenMethodHandler.java | 11 +- .../server/handler/ExchangeDeclareHandler.java | 4 +- .../qpid/server/handler/QueueDeclareHandler.java | 22 +- .../qpid/server/handler/QueueDeleteHandler.java | 10 +- .../qpid/server/handler/TxRollbackHandler.java | 2 +- .../server/protocol/AMQMinaProtocolSession.java | 55 ++- .../qpid/server/protocol/AMQProtocolSession.java | 2 + .../qpid/server/protocol/ExchangeInitialiser.java | 2 + .../org/apache/qpid/server/queue/AMQMessage.java | 82 ++++- .../org/apache/qpid/server/queue/AMQQueue.java | 139 ++++++-- .../server/queue/ConcurrentDeliveryManager.java | 367 --------------------- .../queue/ConcurrentSelectorDeliveryManager.java | 77 ++++- .../apache/qpid/server/queue/DeliveryManager.java | 6 +- .../org/apache/qpid/server/queue/Subscription.java | 2 + .../apache/qpid/server/queue/SubscriptionImpl.java | 59 +++- .../apache/qpid/server/queue/SubscriptionSet.java | 2 +- .../server/queue/SynchronizedDeliveryManager.java | 268 --------------- .../apache/qpid/server/state/AMQStateManager.java | 19 ++ 27 files changed, 552 insertions(+), 783 deletions(-) delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 10f039779c..2529ddc064 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.mina.common.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +57,8 @@ public class AMQChannel private long _prefetch_LowWaterMark; + private long _prefetchSize; + /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the last tag sent out @@ -108,6 +111,8 @@ public class AMQChannel private Set _browsedAcks = new HashSet(); + + public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -151,6 +156,17 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } + public long getPrefetchSize() + { + return _prefetchSize; + } + + + public void setPrefetchSize(long prefetchSize) + { + _prefetchSize = prefetchSize; + } + public long getPrefetchLowMarkCount() { return _prefetch_LowWaterMark; @@ -213,14 +229,15 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - // returns true iff the message was delivered (i.e. if all data was - // received if (_log.isDebugEnabled()) { _log.debug("Content body received on channel " + _channelId); } try { + + // returns true iff the message was delivered (i.e. if all data was + // received if (_currentMessage.addContentBodyFrame(_storeContext, contentBody)) { // callback to allow the context to do any post message processing @@ -269,13 +286,14 @@ public class AMQChannel * @param queue the queue to subscribe to * @param session the protocol session of the subscriber * @param noLocal + * @param exclusive * @return the consumer tag. This is returned to the subscriber and used in * subsequent unsubscribe requests * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -286,7 +304,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal); + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); return tag; } @@ -364,8 +382,10 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(final AMQProtocolSession session) throws AMQException + public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException { + final List msgToRequeue = new LinkedList(); + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException @@ -374,7 +394,20 @@ public class AMQChannel AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); - msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + { + msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null && (consumerTag == null || requeue)) + { + msgToRequeue.add(message); + } + } // false means continue processing return false; } @@ -383,6 +416,12 @@ public class AMQChannel { } }); + + for(UnacknowledgedMessage message : msgToRequeue) + { + _txnContext.deliver(message.message, message.queue); + _unacknowledgedMessageMap.remove(message.deliveryTag); + } } /** @@ -459,8 +498,9 @@ public class AMQChannel { boolean suspend; - suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; - + suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) + || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); + setSuspended(suspend); } @@ -545,4 +585,31 @@ public class AMQChannel } _returnMessages.clear(); } + + + public boolean wouldSuspend(AMQMessage msg) + { + if (isSuspended()) + { + return true; + } + else + { + boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); + if(!willSuspend) + { + final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); + + willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize); + } + + + if(willSuspend) + { + setSuspended(true); + } + return willSuspend; + } + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index ef58ba01a3..7ea22a447f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -73,5 +73,7 @@ public interface UnacknowledgedMessageMap * @return a set of delivery tags */ Set getDeliveryTags(); + + public long getUnacknowledgeBytes(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index a21e4cfff6..e50d239d57 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -32,6 +32,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { private final Object _lock = new Object(); + private long _unackedSize; + private Map _map; private long _lastDeliveryTag; @@ -77,7 +79,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { for (UnacknowledgedMessage msg : msgs) { - _map.remove(msg.deliveryTag); + remove(msg.deliveryTag); + } } } @@ -86,7 +89,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - return _map.remove(deliveryTag); + + UnacknowledgedMessage message = _map.remove(deliveryTag); + if(message != null) + { + _unackedSize -= message.message.getSize(); + } + + return message; } } @@ -113,6 +123,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); + _unackedSize += message.message.getSize(); _lastDeliveryTag = deliveryTag; } } @@ -123,6 +134,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { Collection currentEntries = _map.values(); _map = new LinkedHashMap(_prefetchLimit); + _unackedSize = 0l; return currentEntries; } } @@ -149,6 +161,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.clear(); + _unackedSize = 0l; } } @@ -169,6 +182,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } it.remove(); + _unackedSize -= unacked.getValue().message.getSize(); destination.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) @@ -189,7 +203,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap AMQShortString consumerTag = entry.getValue().consumerTag; AMQMessage msg = entry.getValue().message; - msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + if(consumerTag != null) + { + msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + } } } } @@ -224,4 +241,9 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } } + + public long getUnacknowledgeBytes() + { + return _unackedSize; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index cadcd22001..374772bc4a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -38,6 +38,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ private ConcurrentMap _exchangeMap = new ConcurrentHashMap(); + private Exchange _defaultExchange; + public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) { //create 'standard' exchanges: @@ -53,9 +55,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { + if(_defaultExchange == null) + { + setDefaultExchange(exchange); + } _exchangeMap.put(exchange.getName(), exchange); } + public void setDefaultExchange(Exchange exchange) + { + _defaultExchange = exchange; + } + public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException { // TODO: check inUse argument @@ -72,7 +83,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public Exchange getExchange(AMQShortString name) { - return _exchangeMap.get(name); + + if(name == null || name.length() == 0) + { + return _defaultExchange; + } + else + { + return _exchangeMap.get(name); + } + } /** @@ -83,7 +103,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void routeContent(AMQMessage payload) throws AMQException { final AMQShortString exchange = payload.getPublishBody().exchange; - final Exchange exch = _exchangeMap.get(exchange); + final Exchange exch = getExchange(exchange); // there is a small window of opportunity for the exchange to be deleted in between // the BasicPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index efcb963f8b..24884d20d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -38,4 +38,6 @@ public interface ExchangeRegistry extends MessageRouter void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException; Exchange getExchange(AMQShortString name); + + void setDefaultExchange(Exchange exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index e078b0cdee..721001b454 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; @@ -66,6 +68,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener AMQProtocolSession session, AMQMethodEvent evt) throws AMQException { session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); + session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 0e37871439..f3e0cc3a63 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -52,6 +52,8 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener evt) throws AMQException { ConnectionOpenBody body = evt.getMethod(); - AMQShortString contextKey = body.virtualHost; + + //todo //FIXME The virtual host must be validated by the server for the connection to open-ok // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (contextKey == null) + if (protocolSession.getContextKey() == null) { - contextKey = generateClientID(); + protocolSession.setContextKey(generateClientID()); } - protocolSession.setContextKey(contextKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, (byte)8, (byte)0, // AMQP version (major, minor) - contextKey); // knownHosts + body.virtualHost); // knownHosts stateManager.changeState(AMQState.CONNECTION_OPEN); protocolSession.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index f6897227aa..84e9a4e3f4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener @@ -84,15 +85,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener 1) + { + if(isExclusive()) + { + decrementSubscriberCount(); + throw EXISTING_EXCLUSIVE; + } + else if(exclusive) + { + decrementSubscriberCount(); + throw EXISTING_SUBSCRIPTION; + } + + } + else if(exclusive) + { + setExclusive(true); + } + debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); @@ -385,6 +409,28 @@ public class AMQQueue implements Managable, Comparable _subscribers.addSubscriber(subscription); } + + private boolean isExclusive() + { + return _isExclusive.get(); + } + + private void setExclusive(boolean exclusive) + { + _isExclusive.set(exclusive); + } + + private int incrementSubscriberCount() + { + return _subscriberCount.incrementAndGet(); + } + + private int decrementSubscriberCount() + { + return _subscriberCount.decrementAndGet(); + } + + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, @@ -400,6 +446,10 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + setExclusive(false); + decrementSubscriberCount(); + + // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { @@ -454,6 +504,23 @@ public class AMQQueue implements Managable, Comparable delete(); } + public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException + { + _deliveryMgr.deliver(storeContext, getName(), msg); + try + { + msg.checkDeliveredToConsumer(); + updateReceivedMessageCount(msg); + } + catch (NoConsumersException e) + { + // as this message will be returned, it should be removed + // from the queue: + dequeue(storeContext, msg); + } + } + + public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { _deliveryMgr.deliver(storeContext, getName(), msg); @@ -547,4 +614,12 @@ public class AMQQueue implements Managable, Comparable _logger.debug(MessageFormat.format(msg, args)); } } + + public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException + { + return _deliveryMgr.performGet(session, channel, acks); + } + + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java deleted file mode 100644 index 1a44e86f1a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * Manages delivery of messages on behalf of a queue - */ -public class ConcurrentDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); - - @Configured(path = "advanced.compressBufferOnQueue", - defaultValue = "false") - public boolean compressBufferOnQueue; - - /** - * Holds any queued messages - */ - private final Queue _messages = new ConcurrentLinkedQueueAtomicSize(); - - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - /** - * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - *

- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. - */ - private ReentrantLock _lock = new ReentrantLock(); - - ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - - //Set values from configuration - Configurator.configure(this); - - if (compressBufferOnQueue) - { - _log.info("Compressing Buffers on queue."); - } - - _subscriptions = subscriptions; - _queue = queue; - } - - /** - * @return boolean if we are queueing - */ - private boolean queueing() - { - return hasQueuedMessages(); - } - - /** - * @param msg to enqueue - * @return true if we are queue this message - */ - private boolean enqueue(AMQMessage msg) throws AMQException - { - if (msg.getPublishBody().immediate) - { - return false; - } - else - { - _lock.lock(); - try - { - if (queueing()) - { - return addMessageToQueue(msg); - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - } - - private void startQueueing(AMQMessage msg) throws AMQException - { - if (!msg.getPublishBody().immediate) - { - addMessageToQueue(msg); - } - } - - private boolean addMessageToQueue(AMQMessage msg) - { - // Shrink the ContentBodies to their actual size to save memory. - /* TODO need to reimplement this - probably not in this class though - * for obvious reasons - - if (compressBufferOnQueue) - { - Iterator it = msg.getContentBodies().iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - cb.reduceBufferToFit(); - } - } - */ - _messages.offer(msg); - - return true; - } - - public boolean hasQueuedMessages() - { - _lock.lock(); - try - { - return !_messages.isEmpty(); - } - finally - { - _lock.unlock(); - } - } - - public int getQueueMessageCount() - { - return getMessageCount(); - } - - /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. - * - * @return int the number of messages in the delivery queue. - */ - private int getMessageCount() - { - return _messages.size(); - } - - - public synchronized List getMessages() - { - return new ArrayList(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(storeContext, _queue); - } - } - - public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(storeContext, _queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() throws AMQException - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - AMQMessage message = peek(); - - //While we have messages to send and subscribers to send them to. - while (message != null && hasSubscribers) - { - // _log.debug("Have messages(" + _messages.size() + ") and subscribers"); - Subscription next = _subscriptions.nextSubscriber(message); - //FIXME Is there still not the chance that this subscribe could be suspended between here and the send? - - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - next.send(message, _queue); - poll(); - message = peek(); - } - else - { - hasSubscribers = false; - } - } - } - catch (FailedDequeueException e) - { - _log.error("Unable to deliver message as dequeue failed: " + e, e); - } - finally - { - _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); - } - } - - private AMQMessage peek() - { - return _messages.peek(); - } - - private AMQMessage poll() - { - return _messages.poll(); - } - - Runner asyncDelivery = new Runner(); - - public void processAsync(Executor executor) - { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); - - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(asyncDelivery); - } - } - } - } - - public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - // not queueing so deliver message to 'next' subscriber - _lock.lock(); - try - { - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - if (!msg.getPublishBody().immediate) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - } - else - { - s.send(msg, _queue); - } - } - finally - { - _lock.unlock(); - } - } - } - - private class Runner implements Runnable - { - public void run() - { - boolean running = true; - while (running) - { - try - { - processQueue(); - } - catch (AMQException e) - { - _log.error("Error processing queue: " + e, e); - _log.error("Delivery manager terminating."); - running = false; - _processing.set(false); - break; - } - - //Check that messages have not been added since we did our last peek(); - // Synchronize with the thread that adds to the queue. - // If the queue is still empty then we can exit - _lock.lock(); - try - { - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) - { - running = false; - _processing.set(false); - } - } - finally - { - _lock.unlock(); - } - } - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 91c49a4cf9..ba4d0bf4ba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -28,6 +28,8 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.ArrayList; import java.util.Iterator; @@ -52,6 +54,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * Holds any queued messages */ private final Queue _messages = new ConcurrentLinkedQueueAtomicSize(); + + private final ReentrantLock _messageAccessLock = new ReentrantLock(); + //private int _messageCount; /** * Ensures that only one asynchronous task is running for this manager at @@ -169,6 +174,56 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException + { + AMQMessage msg = getNextMessage(); + if(msg == null) + { + return false; + } + else + { + + try + { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. + + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!acks) + { + if (_log.isDebugEnabled()) + { + _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + } + _queue.dequeue(channel.getStoreContext(), msg); + } + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); + + if (acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue); + } + + msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount()); + } + } + finally + { + msg.setDeliveredToConsumer(); + } + return true; + + } + } + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); @@ -178,22 +233,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException + public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException { + long count = 0; AMQMessage msg = poll(); while (msg != null) { msg.dequeue(storeContext, _queue); + count++; msg = poll(); } + return count; + } + + public synchronized AMQMessage getNextMessage() throws AMQException + { + return getNextMessage(_messages); } - private AMQMessage getNextMessage(Queue messages, Subscription sub) + private AMQMessage getNextMessage(Queue messages) + { + return getNextMessage(messages, false); + } + + private AMQMessage getNextMessage(Queue messages, boolean browsing) { AMQMessage message = messages.peek(); - while (message != null && (sub.isBrowser() || message.taken())) + while (message != null && (browsing || message.taken())) { //remove the already taken message messages.poll(); @@ -208,7 +276,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = null; try { - message = getNextMessage(messageQueue, sub); + message = getNextMessage(messageQueue, sub.isBrowser()); // message will be null if we have no messages in the messageQueue. if (message == null) @@ -287,6 +355,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug(id() + "deliver :" + msg); } + msg.release(); //Check if we have someone to deliver the message to. _lock.lock(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index d3d235f07f..6954be8473 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.concurrent.Executor; import java.util.List; @@ -72,9 +74,11 @@ interface DeliveryManager void removeAMessageFromTop(StoreContext storeContext) throws AMQException; - void clearAllMessages(StoreContext storeContext) throws AMQException; + long clearAllMessages(StoreContext storeContext) throws AMQException; List getMessages(); void populatePreDeliveryQueue(Subscription subscription); + + boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 2dab551e07..5277069d33 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -45,4 +45,6 @@ public interface Subscription void close(); boolean isBrowser(); + + boolean wouldSuspend(AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e2356faaf5..e120752959 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -66,6 +66,7 @@ public class SubscriptionImpl implements Subscription private final boolean _isBrowser; private final Boolean _autoClose; private boolean _closed = false; + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); public static class Factory implements SubscriptionFactory { @@ -300,37 +301,54 @@ public class SubscriptionImpl implements Subscription { if (_noLocal) { + boolean isLocal; // We don't want local messages so check to see if message is one we sent - Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()); - Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString()); + Object localInstance; + Object msgInstance; - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + if((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (_logger.isTraceEnabled()) + if((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; + } } - return false; } - else // if not then filter the message. + else { - if (_logger.isTraceEnabled()) + localInstance = protocolSession.getClientIdentifier(); + msgInstance = msg.getPublisher().getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { - _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) + - ") but not ours so filtering"); + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; } - return checkFilters(msg); + } + + } - else + + + if (_logger.isTraceEnabled()) { - if (_logger.isTraceEnabled()) - { - _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); - } - return checkFilters(msg); + _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); } + return checkFilters(msg); + } private boolean checkFilters(AMQMessage msg) @@ -393,6 +411,11 @@ public class SubscriptionImpl implements Subscription return _isBrowser; } + public boolean wouldSuspend(AMQMessage msg) + { + return channel.wouldSuspend(msg); + } + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, AMQShortString routingKey, AMQShortString exchange) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 8272202571..e7c90fb201 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -137,7 +137,7 @@ class SubscriptionSet implements WeightedSubscriptionManager ++_currentSubscriber; subscriberScanned(); - if (!subscription.isSuspended()) + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { if (subscription.hasInterest(msg)) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java deleted file mode 100644 index 02fe86a083..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.StoreContext; -import org.apache.log4j.Logger; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Manages delivery of messages on behalf of a queue - */ -class SynchronizedDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); - - /** - * Holds any queued messages - */ - private final Queue _messages = new LinkedList(); - - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * An indication of the mode we are in. If this is true then messages are - * being queued up in _messages for asynchronous delivery. If it is false - * then messages can be delivered directly as they come in. - */ - private volatile boolean _queueing; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - _subscriptions = subscriptions; - _queue = queue; - } - - private synchronized boolean enqueue(AMQMessage msg) throws AMQException - { - if (msg.getPublishBody().immediate) - { - return false; - } - else - { - if (_queueing) - { - _messages.offer(msg); - return true; - } - else - { - return false; - } - } - } - - private synchronized void startQueueing(AMQMessage msg) throws AMQException - { - _queueing = true; - enqueue(msg); - } - - /** - * Determines whether there are queued messages. Sets _queueing to false if - * there are no queued messages. This needs to be atomic. - * - * @return true if there are queued messages - */ - public synchronized boolean hasQueuedMessages() - { - boolean empty = _messages.isEmpty(); - if (empty) - { - _queueing = false; - } - return !empty; - } - - public synchronized int getQueueMessageCount() - { - return _messages.size(); - } - - public synchronized List getMessages() - { - return new ArrayList(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(storeContext, _queue); - } - } - - public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(storeContext, _queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - while (hasQueuedMessages() && hasSubscribers) - { - Subscription next = _subscriptions.nextSubscriber(peek()); - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - try - { - next.send(poll(), _queue); - } - catch (AMQException e) - { - _log.error("Unable to deliver message: " + e, e); - } - } - else - { - hasSubscribers = false; - } - } - } - finally - { - _processing.set(false); - } - } - - private synchronized AMQMessage peek() - { - return _messages.peek(); - } - - private synchronized AMQMessage poll() - { - return _messages.poll(); - } - - /** - * Requests that the delivery manager start processing the queue asynchronously - * if there is work that can be done (i.e. there are messages queued up and - * subscribers that can receive them. - *

- * This should be called when subscribers are added, but only after the consume-ok - * message has been returned as message delivery may start immediately. It should also - * be called after unsuspending a client. - *

- * - * @param executor the executor on which the delivery should take place - */ - public void processAsync(Executor executor) - { - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(new Runner()); - } - } - } - } - - /** - * Handles message delivery. The delivery manager is always in one of two modes; - * it is either queueing messages for asynchronous delivery or delivering - * directly. - * - * @param name the name of the entity on whose behalf we are delivering the message - * @param msg the message to deliver - * @throws NoConsumersException if there are no active subscribers to deliver - * the message to - */ - public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - synchronized(this) - { - // not queueing so deliver message to 'next' subscriber - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - else - { - s.send(msg, _queue); - } - } - } - - } - - private class Runner implements Runnable - { - public void run() - { - processQueue(); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 70e530699e..81ce704026 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; @@ -28,6 +30,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.AMQChannel; import org.apache.log4j.Logger; import java.util.HashMap; @@ -118,12 +121,14 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance()); frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance()); frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance()); + frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance()); frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance()); frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance()); frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance()); frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance()); frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance()); + frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance()); frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance()); frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance()); @@ -168,12 +173,26 @@ public class AMQStateManager implements AMQMethodListener StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { + + checkChannel(evt, _protocolSession); + handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); return true; } return false; } + private void checkChannel(AMQMethodEvent evt, AMQProtocolSession protocolSession) + throws AMQException + { + if(evt.getChannelId() != 0 + && !(evt.getMethod() instanceof ChannelOpenBody) + && protocolSession.getChannel(evt.getChannelId()) == null) + { + throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId()); + } + } + protected StateAwareMethodListener findStateTransitionHandler(AMQState currentState, B frame) throws IllegalStateTransitionException -- cgit v1.2.1