From b2023145c2b88ee458429663536cbab7ddd8f3b0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 8 Mar 2014 18:56:42 +0000 Subject: QPID-5617 : [Java Broker] restore or implement child added/removed notifications for configured objects within the vhost git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575591 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/binding/BindingImpl.java | 1 + .../org/apache/qpid/server/consumer/Consumer.java | 79 ------------- .../apache/qpid/server/consumer/ConsumerImpl.java | 79 +++++++++++++ .../qpid/server/consumer/ConsumerTarget.java | 4 +- .../apache/qpid/server/filter/FilterSupport.java | 7 +- .../qpid/server/message/MessageDestination.java | 1 - .../qpid/server/message/MessageInstance.java | 13 +-- .../apache/qpid/server/message/MessageSource.java | 12 +- .../qpid/server/message/MessageSourceConsumer.java | 28 +++++ .../server/model/adapter/ConnectionAdapter.java | 58 +++++----- .../qpid/server/model/adapter/SessionAdapter.java | 25 +++-- .../qpid/server/protocol/AMQConnectionModel.java | 5 + .../qpid/server/protocol/AMQSessionModel.java | 8 ++ .../qpid/server/protocol/ConsumerListener.java | 30 +++++ .../qpid/server/protocol/SessionModelListener.java | 27 +++++ .../org/apache/qpid/server/queue/AMQQueue.java | 3 + .../apache/qpid/server/queue/AbstractQueue.java | 63 +++++++++-- .../queue/AssignedConsumerMessageGroupManager.java | 1 - .../org/apache/qpid/server/queue/BaseQueue.java | 1 - .../queue/DefinedGroupMessageGroupManager.java | 4 +- .../apache/qpid/server/queue/QueueConsumer.java | 6 +- .../org/apache/qpid/server/queue/QueueEntry.java | 1 - .../apache/qpid/server/queue/QueueEntryImpl.java | 8 +- .../apache/qpid/server/queue/QueueEntryList.java | 1 - .../qpid/server/security/SecurityManager.java | 4 +- .../apache/qpid/server/consumer/MockConsumer.java | 39 ++++++- .../qpid/server/queue/AbstractQueueTestBase.java | 56 +++++----- .../apache/qpid/server/queue/ConsumerListTest.java | 4 +- .../qpid/server/queue/MockMessageInstance.java | 10 +- .../qpid/server/queue/PriorityQueueTest.java | 4 +- .../qpid/server/queue/QueueEntryListTestBase.java | 1 - .../qpid/server/queue/StandardQueueTest.java | 24 ++-- .../server/protocol/v0_10/ConsumerTarget_0_10.java | 12 +- .../server/protocol/v0_10/ServerConnection.java | 35 ++++++ .../qpid/server/protocol/v0_10/ServerSession.java | 90 +++++++++++++++ .../protocol/v0_10/ServerSessionDelegate.java | 14 +-- .../qpid/server/protocol/v0_8/AMQChannel.java | 124 +++++++++++++++++---- .../server/protocol/v0_8/AMQProtocolEngine.java | 43 ++++++- .../server/protocol/v0_8/ClientDeliveryMethod.java | 5 +- .../server/protocol/v0_8/ConsumerTarget_0_8.java | 13 +-- .../protocol/v0_8/ExtractResendAndRequeue.java | 4 +- .../server/protocol/v0_8/RecordDeliveryMethod.java | 4 +- .../v0_8/handler/BasicGetMethodHandler.java | 12 +- .../apache/qpid/server/protocol/v0_8/AckTest.java | 30 ++--- .../protocol/v0_8/ExtractResendAndRequeueTest.java | 8 +- .../protocol/v0_8/InternalTestProtocolSession.java | 4 +- .../qpid/server/protocol/v1_0/Connection_1_0.java | 41 ++++++- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 10 +- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 22 ++-- .../qpid/server/protocol/v1_0/Session_1_0.java | 89 ++++++++++++++- .../server/management/amqp/ManagementNode.java | 12 +- .../management/amqp/ManagementNodeConsumer.java | 6 +- .../server/management/amqp/ManagementResponse.java | 8 +- .../plugin/servlet/rest/MessageServlet.java | 4 +- 54 files changed, 876 insertions(+), 321 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 9fdae72188..0bba5c7967 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -94,6 +94,7 @@ public class BindingImpl //Perform ACLs queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + _queue.bindingCreated(this); _logSubject = new BindingLogSubject(_bindingKey,exchange,queue); getEventLogger().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), getArguments() != null diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java deleted file mode 100644 index 4cfcb01cf0..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.consumer; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.protocol.AMQSessionModel; - -public interface Consumer -{ - AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0); - - void externalStateChange(); - - enum Option - { - ACQUIRES, - SEES_REQUEUES, - TRANSIENT, - EXCLUSIVE, - NO_LOCAL - } - - long getBytesOut(); - - long getMessagesOut(); - - long getUnacknowledgedBytes(); - - long getUnacknowledgedMessages(); - - AMQSessionModel getSessionModel(); - - MessageSource getMessageSource(); - - long getConsumerNumber(); - - boolean isSuspended(); - - boolean isClosed(); - - boolean acquires(); - - boolean seesRequeues(); - - void close(); - - boolean trySendLock(); - - - void getSendLock(); - - void releaseSendLock(); - - boolean isActive(); - - String getName(); - - void flush(); -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java new file mode 100644 index 0000000000..b15b01ede5 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.consumer; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.protocol.AMQSessionModel; + +public interface ConsumerImpl +{ + AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0); + + void externalStateChange(); + + enum Option + { + ACQUIRES, + SEES_REQUEUES, + TRANSIENT, + EXCLUSIVE, + NO_LOCAL + } + + long getBytesOut(); + + long getMessagesOut(); + + long getUnacknowledgedBytes(); + + long getUnacknowledgedMessages(); + + AMQSessionModel getSessionModel(); + + MessageSource getMessageSource(); + + long getConsumerNumber(); + + boolean isSuspended(); + + boolean isClosed(); + + boolean acquires(); + + boolean seesRequeues(); + + void close(); + + boolean trySendLock(); + + + void getSendLock(); + + void releaseSendLock(); + + boolean isActive(); + + String getName(); + + void flush(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 8f6b008f9b..faf5a724f3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -36,9 +36,9 @@ public interface ConsumerTarget State getState(); - void consumerAdded(Consumer sub); + void consumerAdded(ConsumerImpl sub); - void consumerRemoved(Consumer sub); + void consumerRemoved(ConsumerImpl sub); void setStateListener(StateChangeListener listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 043fa4a664..d0b1670a45 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -30,9 +30,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport @@ -132,8 +131,8 @@ public class FilterSupport public boolean matches(Filterable message) { - final Collection consumers = _queue.getConsumers(); - for(Consumer c : consumers) + final Collection consumers = _queue.getConsumers(); + for(ConsumerImpl c : consumers) { if(c.getSessionModel().getConnectionReference() == message.getConnectionReference()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 1913f11ae1..1a70f80eab 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.message; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 8c69f47367..4ee47e05e9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -21,10 +21,9 @@ package org.apache.qpid.server.message; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -50,23 +49,23 @@ public interface MessageInstance boolean acquiredByConsumer(); - boolean isAcquiredBy(Consumer consumer); + boolean isAcquiredBy(ConsumerImpl consumer); void setRedelivered(); boolean isRedelivered(); - Consumer getDeliveredConsumer(); + ConsumerImpl getDeliveredConsumer(); void reject(); - boolean isRejectedBy(Consumer consumer); + boolean isRejectedBy(ConsumerImpl consumer); boolean getDeliveredToConsumer(); boolean expired(); - boolean acquire(Consumer sub); + boolean acquire(ConsumerImpl sub); int getMaximumDeliveryCount(); @@ -160,7 +159,7 @@ public interface MessageInstance } } - public final class ConsumerAcquiredState extends EntryState + public final class ConsumerAcquiredState extends EntryState { private final C _consumer; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index 382a9df7e9..7f6629e33d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.message; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -31,13 +31,13 @@ import java.util.EnumSet; public interface MessageSource extends TransactionLogResource, MessageNode { - Consumer addConsumer(ConsumerTarget target, FilterManager filters, + ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters, Class messageClass, - String consumerName, EnumSet options) + String consumerName, EnumSet options) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused; - Collection getConsumers(); + Collection getConsumers(); void addConsumerRegistrationListener(ConsumerRegistrationListener listener); @@ -47,8 +47,8 @@ public interface MessageSource extends TransactionLogResource, MessageNode interface ConsumerRegistrationListener { - void consumerAdded(Q source, Consumer consumer); - void consumerRemoved(Q queue, Consumer consumer); + void consumerAdded(Q source, ConsumerImpl consumer); + void consumerRemoved(Q queue, ConsumerImpl consumer); } /** diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java new file mode 100644 index 0000000000..7ca5c6da8f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.Consumer; + +public interface MessageSourceConsumer> extends ConsumerImpl, Consumer> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 89d5622121..0da93fa784 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -29,9 +29,11 @@ import org.apache.qpid.server.model.*; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.stats.StatisticsGatherer; -final class ConnectionAdapter extends AbstractConfiguredObject implements Connection +final class ConnectionAdapter extends AbstractConfiguredObject implements Connection, + SessionModelListener { private AMQConnectionModel _connection; @@ -61,6 +63,7 @@ final class ConnectionAdapter extends AbstractConfiguredObjectemptyMap(), createAttributes(conn), taskExecutor); _connection = conn; + conn.addSessionListener(this); } private static Map createAttributes(final AMQConnectionModel conn) @@ -146,33 +149,8 @@ final class ConnectionAdapter extends AbstractConfiguredObject getSessions() { - List actualSessions = _connection.getSessionModels(); - synchronized (_sessionAdapters) { - Iterator iterator = _sessionAdapters.keySet().iterator(); - while(iterator.hasNext()) - { - AMQSessionModel session = iterator.next(); - if(!actualSessions.contains(session)) - { - SessionAdapter adapter = _sessionAdapters.get(session); - iterator.remove(); - - childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback. - } - } - - for(AMQSessionModel session : actualSessions) - { - if(!_sessionAdapters.containsKey(session)) - { - SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); - _sessionAdapters.put(session, adapter); - childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback. - } - } - return new ArrayList(_sessionAdapters.values()); } } @@ -186,7 +164,6 @@ final class ConnectionAdapter extends AbstractConfiguredObject session) + { + synchronized (_sessionAdapters) + { + if(!_sessionAdapters.containsKey(session)) + { + SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); + _sessionAdapters.put(session, adapter); + childAdded(adapter); + } + } + } + + @Override + public void sessionRemoved(final AMQSessionModel session) + { + synchronized (_sessionAdapters) + { + SessionAdapter adapter = _sessionAdapters.remove(session); + if(adapter != null) + { + childRemoved(adapter); + } + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 95e408ddf5..19d174a157 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -29,10 +29,11 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.*; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.queue.QueueConsumer; final class SessionAdapter extends AbstractConfiguredObject implements Session @@ -41,7 +42,6 @@ final class SessionAdapter extends AbstractConfiguredObject impl private AMQSessionModel _session; - private Map _consumerAdapters = new HashMap(); @ManagedAttributeField private int _channelId; @@ -50,6 +50,20 @@ final class SessionAdapter extends AbstractConfiguredObject impl { super(Collections.emptyMap(),createAttributes(session), taskExecutor); _session = session; + _session.addConsumerListener(new ConsumerListener() + { + @Override + public void consumerAdded(final Consumer consumer) + { + childAdded(consumer); + } + + @Override + public void consumerRemoved(final Consumer consumer) + { + childRemoved(consumer); + } + }); } private static Map createAttributes(final AMQSessionModel session) @@ -75,15 +89,12 @@ final class SessionAdapter extends AbstractConfiguredObject impl public Collection getConsumers() { - synchronized (_consumerAdapters) - { - return new ArrayList(_consumerAdapters.values()); - } + return (Collection) _session.getConsumers(); } public Collection getPublishers() { - return null; //TODO + return Collections.emptySet(); //TODO } public String setName(final String currentName, final String desiredName) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index d978705841..624e2cd280 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.transport.SessionListener; import java.net.SocketAddress; import java.security.Principal; @@ -98,4 +99,8 @@ public interface AMQConnectionModel, S extends String getVirtualHostName(); String getRemoteContainerName(); + + void addSessionListener(SessionModelListener listener); + + void removeSessionListener(SessionModelListener listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index db92f6950d..e7c14a7a4d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.protocol; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.Deletable; @@ -86,4 +88,10 @@ public interface AMQSessionModel, C extends AMQCo int getChannelId(); int getConsumerCount(); + + Collection> getConsumers(); + + void addConsumerListener(ConsumerListener listener); + + void removeConsumerListener(ConsumerListener listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java new file mode 100644 index 0000000000..7d797d05b8 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.server.model.Consumer; + +public interface ConsumerListener +{ + void consumerAdded(Consumer consumer); + + void consumerRemoved(Consumer consumer); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java new file mode 100644 index 0000000000..80a32682e2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface SessionModelListener +{ + void sessionAdded(AMQSessionModel session); + void sessionRemoved(AMQSessionModel session); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0db7d78576..b758365039 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -224,4 +225,6 @@ public interface AMQQueue> long getUnacknowledgedMessages(); + + void bindingCreated(Binding binding); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 63dfdeb4b8..8e791cd326 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -36,6 +36,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageSource; @@ -54,7 +55,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -99,6 +99,7 @@ public abstract class AbstractQueue }; private final VirtualHost _virtualHost; + private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); /** null means shared */ private String _description; @@ -682,7 +683,7 @@ public abstract class AbstractQueue final FilterManager filters, final Class messageClass, final String consumerName, - EnumSet optionSet) + EnumSet optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { @@ -762,13 +763,13 @@ public abstract class AbstractQueue throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); } - boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); - boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(ConsumerImpl.Option.TRANSIENT); - if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL)) + if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL)) { optionSet = EnumSet.copyOf(optionSet); - optionSet.add(Consumer.Option.NO_LOCAL); + optionSet.add(ConsumerImpl.Option.NO_LOCAL); } if(exclusive && getConsumerCount() != 0) @@ -822,6 +823,9 @@ public abstract class AbstractQueue // TODO } + childAdded(consumer); + consumer.addChangeListener(_deletedChildListener); + deliverAsync(consumer); return consumer; @@ -2598,7 +2602,7 @@ public abstract class AbstractQueue case CONTAINER: case CONNECTION: AMQSessionModel session = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(session == null) { @@ -2624,7 +2628,7 @@ public abstract class AbstractQueue case CONTAINER: case PRINCIPAL: AMQConnectionModel con = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(con == null) { @@ -2652,7 +2656,7 @@ public abstract class AbstractQueue case NONE: case PRINCIPAL: String containerID = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(containerID == null) { @@ -2683,7 +2687,7 @@ public abstract class AbstractQueue case NONE: case CONTAINER: Principal principal = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(principal == null) { @@ -3079,4 +3083,43 @@ public abstract class AbstractQueue } } + @Override + public void bindingCreated(final Binding binding) + { + childAdded(binding); + binding.addChangeListener(_deletedChildListener); + } + + private class DeletedChildListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + AbstractQueue.this.childRemoved(object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java index 827d44dfb3..8220993e03 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 3bc6cbd625..01594e68a9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index c64291faec..592e596a70 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; @@ -160,7 +160,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - Consumer assignedSub = group.getConsumer(); + ConsumerImpl assignedSub = group.getConsumer(); if(assignedSub == sub) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 660d4ec5d6..5ffbc0dbaa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.Consumer; -public interface QueueConsumer> extends Consumer, org.apache.qpid.server.model.Consumer +public interface QueueConsumer> extends ConsumerImpl, Consumer { void flushBatched(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 1e7c8030c1..d984cf8ab4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; public interface QueueEntry extends MessageInstance, Comparable diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index e3b56d25b7..b62e100eea 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; @@ -29,7 +30,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -201,7 +201,7 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } - public boolean acquire(Consumer sub) + public boolean acquire(ConsumerImpl sub) { final boolean acquired = acquire(((QueueConsumer)sub).getOwningState()); if(acquired) @@ -217,7 +217,7 @@ public abstract class QueueEntryImpl implements QueueEntry return (_state instanceof ConsumerAcquiredState); } - public boolean isAcquiredBy(Consumer consumer) + public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; return state instanceof ConsumerAcquiredState @@ -295,7 +295,7 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(Consumer consumer) + public boolean isRejectedBy(ConsumerImpl consumer) { if (_rejectedBy != null) // We have consumers that rejected this message diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 070b1990a6..d2687a33fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index ac0521b34e..dca3576827 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.security; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; @@ -354,7 +354,7 @@ public class SecurityManager implements ConfigurationChangeListener } } - public void authoriseCreateConsumer(final Consumer consumer) + public void authoriseCreateConsumer(final ConsumerImpl consumer) { // TODO - remove cast to AMQQueue and allow testing of consumption from any MessageSource final AMQQueue queue = (AMQQueue) consumer.getMessageSource(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index eeb83a7148..0f5931b902 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -29,10 +29,13 @@ import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; @@ -41,9 +44,9 @@ import org.apache.qpid.server.util.StateChangeListener; import java.net.SocketAddress; import java.security.Principal; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -175,12 +178,12 @@ public class MockConsumer implements ConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @@ -335,6 +338,24 @@ public class MockConsumer implements ConsumerTarget return 0; } + @Override + public Collection> getConsumers() + { + return null; + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + + } + @Override public void close(AMQConstant cause, String message) { @@ -474,6 +495,18 @@ public class MockConsumer implements ConsumerTarget return null; } + @Override + public void addSessionListener(final SessionModelListener listener) + { + + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + + } + @Override public String getClientVersion() { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 97365479fa..913f400a5f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -33,6 +33,7 @@ import java.util.*; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -46,7 +47,6 @@ import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; import org.apache.qpid.server.consumer.MockConsumer; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -158,8 +158,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase // Check adding a consumer adds it to the queue _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, @@ -195,8 +195,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA, null); _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", @@ -213,8 +213,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageA, null); _queue.enqueue(messageB, null); _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", @@ -234,8 +234,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -281,8 +281,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageA = createMessage(new Long(24)); _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final ArrayList queueEntries = new ArrayList(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -333,8 +333,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageC = createMessage(new Long(26)); _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -386,12 +386,12 @@ abstract class AbstractQueueTestBase extends QpidTestCase QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList queueEntries = new ArrayList(); @@ -428,8 +428,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase // Check adding an exclusive consumer adds it to the queue _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -454,8 +454,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase { _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); } catch (MessageSource.ExistingExclusiveConsumer e) @@ -468,14 +468,14 @@ abstract class AbstractQueueTestBase extends QpidTestCase // existing consumer _consumer.close(); _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); try { _consumer = (QueueConsumer) _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE)); + EnumSet.of(ConsumerImpl.Option.EXCLUSIVE)); } catch (MessageSource.ExistingConsumerPreventsExclusive e) @@ -492,7 +492,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage message = createMessage(id); _consumer = (QueueConsumer) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); _queue.enqueue(message, new Action() { @@ -677,11 +677,11 @@ abstract class AbstractQueueTestBase extends QpidTestCase // register the consumers testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); //check that no messages have been delivered to the //consumers during registration diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java index 4a762afbdb..3f5093a03a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.test.utils.QpidTestCase; import static org.mockito.Mockito.mock; @@ -146,7 +146,7 @@ public class ConsumerListTest extends QpidTestCase * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given * Consumer, or null if none is found. */ - private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub) + private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final ConsumerImpl sub) { QueueConsumerList.ConsumerNode node = list.getHead(); while (node != null && node.getConsumer() != sub) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 7c89f9855c..74a2262265 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -60,7 +60,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -76,7 +76,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } @@ -86,7 +86,7 @@ public class MockMessageInstance implements MessageInstance return false; } - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -116,7 +116,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 9c8449ce5f..09cec61909 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Collections; import junit.framework.AssertionFailedError; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -31,7 +32,6 @@ import java.util.ArrayList; import java.util.EnumSet; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.when; @@ -65,7 +65,7 @@ public class PriorityQueueTest extends AbstractQueueTestBase queue.enqueue(createMessage(9L, (byte) 0), null); // Register subscriber - queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class)); + queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class)); Thread.sleep(150); ArrayList msgs = getConsumer().getMessages(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index 39d53637e4..c89d2abeae 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -24,7 +24,6 @@ import junit.framework.TestCase; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 4116be1954..1627de22f2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.message.MessageInstance; @@ -57,8 +57,8 @@ public class StandardQueueTest extends AbstractQueueTestBase ServerMessage message = createMessage(25l); QueueConsumer consumer = (QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); getQueue().enqueue(message, null); consumer.close(); @@ -84,8 +84,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit()); //verify adding an inactive consumer doesn't increase the count @@ -97,8 +97,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit()); //verify behaviour in face of expected state changes: @@ -151,8 +151,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); // put test messages into a queue putGivenNumberOfMessages(queue, 4); @@ -219,8 +219,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, entries.get(0).getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -335,7 +335,7 @@ public class StandardQueueTest extends AbstractQueueTestBase } @Override - public boolean acquire(Consumer sub) + public boolean acquire(ConsumerImpl sub) { if(_message.getMessageNumber() % 2 == 0) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index eeafb30642..a3fabf076c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.EventLogger; @@ -31,7 +32,6 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -66,7 +66,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final Map _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_0_10(ServerSession session, @@ -90,7 +90,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -105,7 +105,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { consumer.getSendLock(); @@ -569,13 +569,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index a3a80415ac..5e899aa635 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -80,6 +81,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel> _taskList = new CopyOnWriteArrayList>(); + private final CopyOnWriteArrayList _sessionListeners = + new CopyOnWriteArrayList(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -383,6 +387,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + @Override public String getClientVersion() { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index c2eacfe6e8..0bb3008d13 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; @@ -108,6 +114,9 @@ public class ServerSession extends Session private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); + private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + public static interface MessageDispositionChangeListener { @@ -133,6 +142,7 @@ public class ServerSession extends Session private final AtomicLong _txnCount = new AtomicLong(0); private Map _subscriptions = new ConcurrentHashMap(); + private final CopyOnWriteArrayList> _consumers = new CopyOnWriteArrayList>(); private final List> _taskList = new CopyOnWriteArrayList>(); @@ -458,6 +468,18 @@ public class ServerSession extends Session _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } + + public void register(final ConsumerImpl consumerImpl) + { + if(consumerImpl instanceof Consumer) + { + final Consumer consumer = (Consumer) consumerImpl; + _consumers.add(consumer); + consumer.addChangeListener(_consumerClosedListener); + consumerAdded(consumer); + } + } + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); @@ -948,6 +970,41 @@ public class ServerSession extends Session return _subscriptions.values().size(); } + @Override + public Collection> getConsumers() + { + + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + @Override public int compareTo(ServerSession o) { @@ -966,4 +1023,37 @@ public class ServerSession extends Session } } } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 2593c66191..040be92ceb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -51,7 +52,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -234,25 +234,25 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).register(destination, target); try { - EnumSet options = EnumSet.noneOf(Consumer.Option.class); + EnumSet options = EnumSet.noneOf(ConsumerImpl.Option.class); if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) { - options.add(Consumer.Option.ACQUIRES); + options.add(ConsumerImpl.Option.ACQUIRES); } if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(method.getExclusive()) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } - Consumer sub = + ((ServerSession)session).register( queue.addConsumer(target, filterManager, MessageTransferMessage.class, destination, - options); + options)); } catch (AMQQueue.ExistingExclusiveConsumer existing) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 780e7ad199..baf5eceef7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.Filterable; @@ -66,7 +67,12 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.CapacityChecker; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -76,7 +82,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -173,6 +178,9 @@ public class AMQChannel> private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); private Subject _subject; + private final CopyOnWriteArrayList> _consumers = new CopyOnWriteArrayList>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); public AMQChannel(T session, int channelId, final MessageStore messageStore) @@ -526,7 +534,7 @@ public class AMQChannel> } - public Consumer getSubscription(AMQShortString tag) + public ConsumerImpl getSubscription(AMQShortString tag) { final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); return target == null ? null : target.getConsumer(); @@ -545,7 +553,7 @@ public class AMQChannel> * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * - * @throws AMQException if something goes wrong + * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) @@ -564,7 +572,7 @@ public class AMQChannel> } ConsumerTarget_0_8 target; - EnumSet options = EnumSet.noneOf(Consumer.Option.class); + EnumSet options = EnumSet.noneOf(ConsumerImpl.Option.class); if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { @@ -573,19 +581,19 @@ public class AMQChannel> else if(acks) { target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(exclusive) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } @@ -615,12 +623,19 @@ public class AMQChannel> } }); } - Consumer sub = + ConsumerImpl sub = source.addConsumer(target, filterManager, - AMQMessage.class, - AMQShortString.toString(tag), - options); + AMQMessage.class, + AMQShortString.toString(tag), + options); + if(sub instanceof Consumer) + { + final Consumer modelConsumer = (Consumer) sub; + consumerAdded(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + _consumers.add(modelConsumer); + } } catch (AccessControlException e) { @@ -659,15 +674,19 @@ public class AMQChannel> { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - Consumer sub = target == null ? null : target.getConsumer(); + ConsumerImpl sub = target == null ? null : target.getConsumer(); if (sub != null) { sub.close(); + if(sub instanceof Consumer) + { + _consumers.remove(sub); + } return true; } else { - _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."); } return false; } @@ -735,7 +754,7 @@ public class AMQChannel> _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Consumer sub = me.getValue().getConsumer(); + ConsumerImpl sub = me.getValue().getConsumer(); if(sub != null) { @@ -754,7 +773,7 @@ public class AMQChannel> * delivery tag) * @param consumer The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) { if (_logger.isDebugEnabled()) { @@ -1126,7 +1145,7 @@ public class AMQChannel> for(MessageInstance entry : _resendList) { - Consumer sub = entry.getDeliveredConsumer(); + ConsumerImpl sub = entry.getDeliveredConsumer(); if(sub == null || sub.isClosed()) { entry.release(); @@ -1199,7 +1218,7 @@ public class AMQChannel> private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1658,4 +1677,71 @@ public class AMQChannel> { return _tag2SubscriptionTargetMap.size(); } + + @Override + public Collection> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + consumerRemoved((Consumer)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } + + private void consumerAdded(final Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(final Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + @Override + public void addConsumerListener(ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(ConsumerListener listener) + { + _consumerListeners.remove(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 2ebcde199b..a86530fe0e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -56,10 +56,13 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; @@ -73,7 +76,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -102,6 +104,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Map> _channelMap = new HashMap>(); + private final CopyOnWriteArrayList _sessionListeners = + new CopyOnWriteArrayList(); @SuppressWarnings("unchecked") private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; @@ -759,7 +763,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi synchronized (_channelMap) { _channelMap.put(channel.getChannelId(), channel); - + sessionAdded(channel); if(_blocking) { channel.block(); @@ -773,6 +777,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } + private void sessionAdded(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + public Long getMaximumNumberOfChannels() { return _maxNoOfChannels; @@ -844,15 +864,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ public void removeChannel(int channelId) { + AMQChannel session; synchronized (_channelMap) { - _channelMap.remove(channelId); - + session = _channelMap.remove(channelId); if ((channelId & CHANNEL_CACHE_SIZE) == channelId) { _cachedChannels[channelId] = null; } } + sessionRemoved(session); } /** @@ -1509,6 +1530,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + public void setDeferFlush(boolean deferFlush) { _deferFlush = deferFlush; @@ -1525,7 +1558,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, + public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { registerMessageDelivered(message.getSize()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index 9f8799f68e..fa26a73f93 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; public interface ClientDeliveryMethod { - void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, + void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 2ce8caefc9..3de89a1d70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -25,17 +25,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; @@ -71,7 +70,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private Consumer _consumer; + private ConsumerImpl _consumer; public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -368,18 +367,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @@ -428,7 +427,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 1de1638c2e..7a2fdb05fc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import java.util.Map; @@ -49,7 +49,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { message.setRedelivered(); - final Consumer consumer = message.getDeliveredConsumer(); + final ConsumerImpl consumer = message.getDeliveredConsumer(); if (consumer != null) { // Consumer exists diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java index 70d7da3432..c13ff17f67 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; public interface RecordDeliveryMethod { - void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag); + void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index f620abf30f..76b5cbbbb9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; @@ -44,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.AccessControlException; @@ -150,15 +150,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES); + EnumSet options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); if(acks) { @@ -173,7 +173,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener _referenceList = new LinkedList(); - private Consumer _consumer; + private ConsumerImpl _consumer; private boolean _queueDeleted; @Override @@ -74,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase _queue = mock(AMQQueue.class); when(_queue.getName()).thenReturn(getName()); when(_queue.isDeleted()).thenReturn(_queueDeleted); - _consumer = mock(Consumer.class); - when(_consumer.getConsumerNumber()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); + _consumer = mock(ConsumerImpl.class); + when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); long id = 0; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index eaa5b6a7a5..18949bba50 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; @@ -48,7 +49,6 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -244,7 +244,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(Consumer sub, ServerMessage message, + public void deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) { _deliveryCount.incrementAndGet(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 0a53a6436a..00c78581e1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -40,6 +40,8 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -51,6 +53,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; @@ -68,6 +71,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final Object _reference = new Object(); private final Subject _subject = new Subject(); + private final CopyOnWriteArrayList _sessionListeners = + new CopyOnWriteArrayList(); private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); @@ -111,7 +116,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _connectionId = connectionId; _subject.getPrincipals().add(new ConnectionPrincipal(this)); _subjectCreator = subjectCreator; - //_vhost.getConnectionRegistry().registerConnection(this); } @@ -129,6 +133,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST); } _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host); + _vhost.getConnectionRegistry().registerConnection(this); + if(_vhost == null) { final Error err = new Error(); @@ -147,6 +153,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { final Session_1_0 session = new Session_1_0(this, endpoint); _sessions.add(session); + sessionAdded(session); endpoint.setSessionEventListener(new SessionEventListener() { @Override @@ -182,6 +189,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod void sessionEnded(Session_1_0 session) { _sessions.remove(session); + sessionRemoved(session); } public void removeDeleteTask(final Action task) @@ -428,4 +436,35 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _vhost; } + + + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + + private void sessionAdded(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f3417710a5..adb2f8ea6a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -37,13 +37,13 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -60,7 +60,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget private Binary _transactionId; private final AMQPDescribedTypeRegistry _typeRegistry; private final SectionEncoder _sectionEncoder; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_1_0(final SendingLink_1_0 link, boolean acquires) @@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget _acquires = acquires; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -498,13 +498,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 24395a6fad..eb1f75b771 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -55,7 +56,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -69,7 +69,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private VirtualHost _vhost; private SendingDestination _destination; - private Consumer _consumer; + private ConsumerImpl _consumer; private ConsumerTarget_1_0 _target; private boolean _draining; @@ -99,7 +99,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS linkAttachment.setDeliveryStateHandler(this); QueueDestination qd = null; - EnumSet options = EnumSet.noneOf(Consumer.Option.class); + EnumSet options = EnumSet.noneOf(ConsumerImpl.Option.class); boolean noLocal = false; @@ -163,8 +163,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); if(source.getDistributionMode() != StdDistMode.COPY) { - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } } @@ -318,8 +318,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, true); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else @@ -331,7 +331,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { if(noLocal) { - options.add(Consumer.Option.NO_LOCAL); + options.add(ConsumerImpl.Option.NO_LOCAL); } try @@ -372,7 +372,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public void resume(SendingLinkAttachment linkAttachment) { _linkAttachment = linkAttachment; - } public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) @@ -692,4 +691,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _vhost; } + + public ConsumerImpl getConsumer() + { + return _consumer; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 411117be4d..e124b4d5ac 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; import org.apache.qpid.protocol.AMQConstant; @@ -50,6 +51,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -85,6 +87,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel> _consumers = new CopyOnWriteArrayList>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -184,6 +189,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel) + { + Consumer modelConsumer = (Consumer) consumer; + _consumers.add(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + consumerAdded(modelConsumer); + } + } + private AMQQueue createTemporaryQueue(Map properties) { @@ -653,11 +670,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index a47506f804..788ce63c8f 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; @@ -949,7 +949,7 @@ class ManagementNode implements MessageSource, MessageDestination final FilterManager filters, final Class messageClass, final String consumerName, - final EnumSet options) + final EnumSet options) { final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); @@ -1054,7 +1054,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -1072,7 +1072,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -1084,7 +1084,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -1102,7 +1102,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 8a1f39fdfe..a3b1f932ac 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; @@ -33,9 +33,9 @@ import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -class ManagementNodeConsumer implements Consumer +class ManagementNodeConsumer implements ConsumerImpl { - private final long _id = Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); + private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); private final ManagementNode _managementNode; private final List _queue = Collections.synchronizedList(new ArrayList()); private final ConsumerTarget _target; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 18c68bd198..ae2828d392 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +84,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); } @@ -114,7 +114,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -132,7 +132,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index baf92e8522..0947ae2a89 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -40,7 +41,6 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.consumer.Consumer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -327,7 +327,7 @@ public class MessageServlet extends AbstractServlet : entry.isAcquired() ? "Acquired" : ""); - final Consumer deliveredConsumer = entry.getDeliveredConsumer(); + final ConsumerImpl deliveredConsumer = entry.getDeliveredConsumer(); object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getConsumerNumber()); ServerMessage message = entry.getMessage(); -- cgit v1.2.1