From b2023145c2b88ee458429663536cbab7ddd8f3b0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 8 Mar 2014 18:56:42 +0000 Subject: QPID-5617 : [Java Broker] restore or implement child added/removed notifications for configured objects within the vhost git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575591 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/v0_10/ConsumerTarget_0_10.java | 12 +- .../server/protocol/v0_10/ServerConnection.java | 35 ++++++ .../qpid/server/protocol/v0_10/ServerSession.java | 90 +++++++++++++++ .../protocol/v0_10/ServerSessionDelegate.java | 14 +-- .../qpid/server/protocol/v0_8/AMQChannel.java | 124 +++++++++++++++++---- .../server/protocol/v0_8/AMQProtocolEngine.java | 43 ++++++- .../server/protocol/v0_8/ClientDeliveryMethod.java | 5 +- .../server/protocol/v0_8/ConsumerTarget_0_8.java | 13 +-- .../protocol/v0_8/ExtractResendAndRequeue.java | 4 +- .../server/protocol/v0_8/RecordDeliveryMethod.java | 4 +- .../v0_8/handler/BasicGetMethodHandler.java | 12 +- .../apache/qpid/server/protocol/v0_8/AckTest.java | 30 ++--- .../protocol/v0_8/ExtractResendAndRequeueTest.java | 8 +- .../protocol/v0_8/InternalTestProtocolSession.java | 4 +- .../qpid/server/protocol/v1_0/Connection_1_0.java | 41 ++++++- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 10 +- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 22 ++-- .../qpid/server/protocol/v1_0/Session_1_0.java | 89 ++++++++++++++- .../server/management/amqp/ManagementNode.java | 12 +- .../management/amqp/ManagementNodeConsumer.java | 6 +- .../server/management/amqp/ManagementResponse.java | 8 +- .../plugin/servlet/rest/MessageServlet.java | 4 +- 22 files changed, 480 insertions(+), 110 deletions(-) (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index eeafb30642..a3fabf076c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.EventLogger; @@ -31,7 +32,6 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -66,7 +66,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final Map _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_0_10(ServerSession session, @@ -90,7 +90,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -105,7 +105,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { consumer.getSendLock(); @@ -569,13 +569,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index a3a80415ac..5e899aa635 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -80,6 +81,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel> _taskList = new CopyOnWriteArrayList>(); + private final CopyOnWriteArrayList _sessionListeners = + new CopyOnWriteArrayList(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -383,6 +387,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + @Override public String getClientVersion() { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index c2eacfe6e8..0bb3008d13 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; @@ -108,6 +114,9 @@ public class ServerSession extends Session private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); + private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + public static interface MessageDispositionChangeListener { @@ -133,6 +142,7 @@ public class ServerSession extends Session private final AtomicLong _txnCount = new AtomicLong(0); private Map _subscriptions = new ConcurrentHashMap(); + private final CopyOnWriteArrayList> _consumers = new CopyOnWriteArrayList>(); private final List> _taskList = new CopyOnWriteArrayList>(); @@ -458,6 +468,18 @@ public class ServerSession extends Session _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } + + public void register(final ConsumerImpl consumerImpl) + { + if(consumerImpl instanceof Consumer) + { + final Consumer consumer = (Consumer) consumerImpl; + _consumers.add(consumer); + consumer.addChangeListener(_consumerClosedListener); + consumerAdded(consumer); + } + } + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); @@ -948,6 +970,41 @@ public class ServerSession extends Session return _subscriptions.values().size(); } + @Override + public Collection> getConsumers() + { + + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + @Override public int compareTo(ServerSession o) { @@ -966,4 +1023,37 @@ public class ServerSession extends Session } } } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 2593c66191..040be92ceb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -51,7 +52,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -234,25 +234,25 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).register(destination, target); try { - EnumSet options = EnumSet.noneOf(Consumer.Option.class); + EnumSet options = EnumSet.noneOf(ConsumerImpl.Option.class); if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) { - options.add(Consumer.Option.ACQUIRES); + options.add(ConsumerImpl.Option.ACQUIRES); } if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(method.getExclusive()) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } - Consumer sub = + ((ServerSession)session).register( queue.addConsumer(target, filterManager, MessageTransferMessage.class, destination, - options); + options)); } catch (AMQQueue.ExistingExclusiveConsumer existing) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 780e7ad199..baf5eceef7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.Filterable; @@ -66,7 +67,12 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.CapacityChecker; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -76,7 +82,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -173,6 +178,9 @@ public class AMQChannel> private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); private Subject _subject; + private final CopyOnWriteArrayList> _consumers = new CopyOnWriteArrayList>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); public AMQChannel(T session, int channelId, final MessageStore messageStore) @@ -526,7 +534,7 @@ public class AMQChannel> } - public Consumer getSubscription(AMQShortString tag) + public ConsumerImpl getSubscription(AMQShortString tag) { final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); return target == null ? null : target.getConsumer(); @@ -545,7 +553,7 @@ public class AMQChannel> * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * - * @throws AMQException if something goes wrong + * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) @@ -564,7 +572,7 @@ public class AMQChannel> } ConsumerTarget_0_8 target; - EnumSet options = EnumSet.noneOf(Consumer.Option.class); + EnumSet options = EnumSet.noneOf(ConsumerImpl.Option.class); if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { @@ -573,19 +581,19 @@ public class AMQChannel> else if(acks) { target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(exclusive) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } @@ -615,12 +623,19 @@ public class AMQChannel> } }); } - Consumer sub = + ConsumerImpl sub = source.addConsumer(target, filterManager, - AMQMessage.class, - AMQShortString.toString(tag), - options); + AMQMessage.class, + AMQShortString.toString(tag), + options); + if(sub instanceof Consumer) + { + final Consumer modelConsumer = (Consumer) sub; + consumerAdded(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + _consumers.add(modelConsumer); + } } catch (AccessControlException e) { @@ -659,15 +674,19 @@ public class AMQChannel> { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - Consumer sub = target == null ? null : target.getConsumer(); + ConsumerImpl sub = target == null ? null : target.getConsumer(); if (sub != null) { sub.close(); + if(sub instanceof Consumer) + { + _consumers.remove(sub); + } return true; } else { - _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."); } return false; } @@ -735,7 +754,7 @@ public class AMQChannel> _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Consumer sub = me.getValue().getConsumer(); + ConsumerImpl sub = me.getValue().getConsumer(); if(sub != null) { @@ -754,7 +773,7 @@ public class AMQChannel> * delivery tag) * @param consumer The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) { if (_logger.isDebugEnabled()) { @@ -1126,7 +1145,7 @@ public class AMQChannel> for(MessageInstance entry : _resendList) { - Consumer sub = entry.getDeliveredConsumer(); + ConsumerImpl sub = entry.getDeliveredConsumer(); if(sub == null || sub.isClosed()) { entry.release(); @@ -1199,7 +1218,7 @@ public class AMQChannel> private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1658,4 +1677,71 @@ public class AMQChannel> { return _tag2SubscriptionTargetMap.size(); } + + @Override + public Collection> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + consumerRemoved((Consumer)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } + + private void consumerAdded(final Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(final Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + @Override + public void addConsumerListener(ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(ConsumerListener listener) + { + _consumerListeners.remove(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 2ebcde199b..a86530fe0e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -56,10 +56,13 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; @@ -73,7 +76,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -102,6 +104,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Map> _channelMap = new HashMap>(); + private final CopyOnWriteArrayList _sessionListeners = + new CopyOnWriteArrayList(); @SuppressWarnings("unchecked") private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; @@ -759,7 +763,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi synchronized (_channelMap) { _channelMap.put(channel.getChannelId(), channel); - + sessionAdded(channel); if(_blocking) { channel.block(); @@ -773,6 +777,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } + private void sessionAdded(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + public Long getMaximumNumberOfChannels() { return _maxNoOfChannels; @@ -844,15 +864,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ public void removeChannel(int channelId) { + AMQChannel session; synchronized (_channelMap) { - _channelMap.remove(channelId); - + session = _channelMap.remove(channelId); if ((channelId & CHANNEL_CACHE_SIZE) == channelId) { _cachedChannels[channelId] = null; } } + sessionRemoved(session); } /** @@ -1509,6 +1530,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + public void setDeferFlush(boolean deferFlush) { _deferFlush = deferFlush; @@ -1525,7 +1558,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, + public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { registerMessageDelivered(message.getSize()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index 9f8799f68e..fa26a73f93 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; public interface ClientDeliveryMethod { - void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, + void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 2ce8caefc9..3de89a1d70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -25,17 +25,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; @@ -71,7 +70,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private Consumer _consumer; + private ConsumerImpl _consumer; public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -368,18 +367,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @@ -428,7 +427,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 1de1638c2e..7a2fdb05fc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import java.util.Map; @@ -49,7 +49,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { message.setRedelivered(); - final Consumer consumer = message.getDeliveredConsumer(); + final ConsumerImpl consumer = message.getDeliveredConsumer(); if (consumer != null) { // Consumer exists diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java index 70d7da3432..c13ff17f67 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; public interface RecordDeliveryMethod { - void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag); + void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index f620abf30f..76b5cbbbb9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; @@ -44,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.AccessControlException; @@ -150,15 +150,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES); + EnumSet options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); if(acks) { @@ -173,7 +173,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener _referenceList = new LinkedList(); - private Consumer _consumer; + private ConsumerImpl _consumer; private boolean _queueDeleted; @Override @@ -74,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase _queue = mock(AMQQueue.class); when(_queue.getName()).thenReturn(getName()); when(_queue.isDeleted()).thenReturn(_queueDeleted); - _consumer = mock(Consumer.class); - when(_consumer.getConsumerNumber()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); + _consumer = mock(ConsumerImpl.class); + when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); long id = 0; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index eaa5b6a7a5..18949bba50 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; @@ -48,7 +49,6 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -244,7 +244,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(Consumer sub, ServerMessage message, + public void deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) { _deliveryCount.incrementAndGet(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 0a53a6436a..00c78581e1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -40,6 +40,8 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -51,6 +53,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; @@ -68,6 +71,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final Object _reference = new Object(); private final Subject _subject = new Subject(); + private final CopyOnWriteArrayList _sessionListeners = + new CopyOnWriteArrayList(); private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); @@ -111,7 +116,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _connectionId = connectionId; _subject.getPrincipals().add(new ConnectionPrincipal(this)); _subjectCreator = subjectCreator; - //_vhost.getConnectionRegistry().registerConnection(this); } @@ -129,6 +133,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST); } _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host); + _vhost.getConnectionRegistry().registerConnection(this); + if(_vhost == null) { final Error err = new Error(); @@ -147,6 +153,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { final Session_1_0 session = new Session_1_0(this, endpoint); _sessions.add(session); + sessionAdded(session); endpoint.setSessionEventListener(new SessionEventListener() { @Override @@ -182,6 +189,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod void sessionEnded(Session_1_0 session) { _sessions.remove(session); + sessionRemoved(session); } public void removeDeleteTask(final Action task) @@ -428,4 +436,35 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _vhost; } + + + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + + private void sessionAdded(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f3417710a5..adb2f8ea6a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -37,13 +37,13 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -60,7 +60,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget private Binary _transactionId; private final AMQPDescribedTypeRegistry _typeRegistry; private final SectionEncoder _sectionEncoder; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_1_0(final SendingLink_1_0 link, boolean acquires) @@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget _acquires = acquires; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -498,13 +498,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 24395a6fad..eb1f75b771 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -55,7 +56,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -69,7 +69,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private VirtualHost _vhost; private SendingDestination _destination; - private Consumer _consumer; + private ConsumerImpl _consumer; private ConsumerTarget_1_0 _target; private boolean _draining; @@ -99,7 +99,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS linkAttachment.setDeliveryStateHandler(this); QueueDestination qd = null; - EnumSet options = EnumSet.noneOf(Consumer.Option.class); + EnumSet options = EnumSet.noneOf(ConsumerImpl.Option.class); boolean noLocal = false; @@ -163,8 +163,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); if(source.getDistributionMode() != StdDistMode.COPY) { - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } } @@ -318,8 +318,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, true); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else @@ -331,7 +331,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { if(noLocal) { - options.add(Consumer.Option.NO_LOCAL); + options.add(ConsumerImpl.Option.NO_LOCAL); } try @@ -372,7 +372,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public void resume(SendingLinkAttachment linkAttachment) { _linkAttachment = linkAttachment; - } public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) @@ -692,4 +691,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _vhost; } + + public ConsumerImpl getConsumer() + { + return _consumer; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 411117be4d..e124b4d5ac 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; import org.apache.qpid.protocol.AMQConstant; @@ -50,6 +51,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -85,6 +87,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel> _consumers = new CopyOnWriteArrayList>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -184,6 +189,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel) + { + Consumer modelConsumer = (Consumer) consumer; + _consumers.add(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + consumerAdded(modelConsumer); + } + } + private AMQQueue createTemporaryQueue(Map properties) { @@ -653,11 +670,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index a47506f804..788ce63c8f 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; @@ -949,7 +949,7 @@ class ManagementNode implements MessageSource, MessageDestination final FilterManager filters, final Class messageClass, final String consumerName, - final EnumSet options) + final EnumSet options) { final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); @@ -1054,7 +1054,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -1072,7 +1072,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -1084,7 +1084,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -1102,7 +1102,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 8a1f39fdfe..a3b1f932ac 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; @@ -33,9 +33,9 @@ import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -class ManagementNodeConsumer implements Consumer +class ManagementNodeConsumer implements ConsumerImpl { - private final long _id = Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); + private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); private final ManagementNode _managementNode; private final List _queue = Collections.synchronizedList(new ArrayList()); private final ConsumerTarget _target; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 18c68bd198..ae2828d392 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +84,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); } @@ -114,7 +114,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -132,7 +132,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index baf92e8522..0947ae2a89 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -40,7 +41,6 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.consumer.Consumer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -327,7 +327,7 @@ public class MessageServlet extends AbstractServlet : entry.isAcquired() ? "Acquired" : ""); - final Consumer deliveredConsumer = entry.getDeliveredConsumer(); + final ConsumerImpl deliveredConsumer = entry.getDeliveredConsumer(); object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getConsumerNumber()); ServerMessage message = entry.getMessage(); -- cgit v1.2.1