summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-08 18:56:42 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-08 18:56:42 +0000
commitb2023145c2b88ee458429663536cbab7ddd8f3b0 (patch)
tree259f7ed1e2372025c7a65338abc3a58ef6b88e74 /qpid/java
parent19b2671cbd4af77ac52c222605c09b06cab7ced6 (diff)
downloadqpid-python-b2023145c2b88ee458429663536cbab7ddd8f3b0.tar.gz
QPID-5617 : [Java Broker] restore or implement child added/removed notifications for configured objects within the vhost
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java)2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java28
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java58
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java63
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java1
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java39
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java56
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java24
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java35
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java90
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java124
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java43
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java30
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java41
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java22
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java89
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java12
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java4
53 files changed, 798 insertions, 243 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 9fdae72188..0bba5c7967 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -94,6 +94,7 @@ public class BindingImpl
//Perform ACLs
queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+ _queue.bindingCreated(this);
_logSubject = new BindingLogSubject(_bindingKey,exchange,queue);
getEventLogger().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()),
getArguments() != null
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index 4cfcb01cf0..b15b01ede5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
-public interface Consumer
+public interface ConsumerImpl
{
AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 8f6b008f9b..faf5a724f3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -36,9 +36,9 @@ public interface ConsumerTarget
State getState();
- void consumerAdded(Consumer sub);
+ void consumerAdded(ConsumerImpl sub);
- void consumerRemoved(Consumer sub);
+ void consumerRemoved(ConsumerImpl sub);
void setStateListener(StateChangeListener<ConsumerTarget, State> listener);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index 043fa4a664..d0b1670a45 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -30,9 +30,8 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
public class FilterSupport
@@ -132,8 +131,8 @@ public class FilterSupport
public boolean matches(Filterable message)
{
- final Collection<? extends Consumer> consumers = _queue.getConsumers();
- for(Consumer c : consumers)
+ final Collection<? extends ConsumerImpl> consumers = _queue.getConsumers();
+ for(ConsumerImpl c : consumers)
{
if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 1913f11ae1..1a70f80eab 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.message;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 8c69f47367..4ee47e05e9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -21,10 +21,9 @@
package org.apache.qpid.server.message;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -50,23 +49,23 @@ public interface MessageInstance
boolean acquiredByConsumer();
- boolean isAcquiredBy(Consumer consumer);
+ boolean isAcquiredBy(ConsumerImpl consumer);
void setRedelivered();
boolean isRedelivered();
- Consumer getDeliveredConsumer();
+ ConsumerImpl getDeliveredConsumer();
void reject();
- boolean isRejectedBy(Consumer consumer);
+ boolean isRejectedBy(ConsumerImpl consumer);
boolean getDeliveredToConsumer();
boolean expired();
- boolean acquire(Consumer sub);
+ boolean acquire(ConsumerImpl sub);
int getMaximumDeliveryCount();
@@ -160,7 +159,7 @@ public interface MessageInstance
}
}
- public final class ConsumerAcquiredState<C extends Consumer> extends EntryState
+ public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
private final C _consumer;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index 382a9df7e9..7f6629e33d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.message;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -31,13 +31,13 @@ import java.util.EnumSet;
public interface MessageSource extends TransactionLogResource, MessageNode
{
- Consumer addConsumer(ConsumerTarget target, FilterManager filters,
+ ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters,
Class<? extends ServerMessage> messageClass,
- String consumerName, EnumSet<Consumer.Option> options)
+ String consumerName, EnumSet<ConsumerImpl.Option> options)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused;
- Collection<? extends Consumer> getConsumers();
+ Collection<? extends ConsumerImpl> getConsumers();
void addConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener);
@@ -47,8 +47,8 @@ public interface MessageSource extends TransactionLogResource, MessageNode
interface ConsumerRegistrationListener<Q extends MessageSource>
{
- void consumerAdded(Q source, Consumer consumer);
- void consumerRemoved(Q queue, Consumer consumer);
+ void consumerAdded(Q source, ConsumerImpl consumer);
+ void consumerRemoved(Q queue, ConsumerImpl consumer);
}
/**
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java
new file mode 100644
index 0000000000..7ca5c6da8f
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.model.Consumer;
+
+public interface MessageSourceConsumer<X extends MessageSourceConsumer<X>> extends ConsumerImpl, Consumer<MessageSourceConsumer<X>>
+{
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 89d5622121..0da93fa784 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -29,9 +29,11 @@ import org.apache.qpid.server.model.*;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.stats.StatisticsGatherer;
-final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>
+final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>,
+ SessionModelListener
{
private AMQConnectionModel _connection;
@@ -61,6 +63,7 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter
{
super(Collections.<String,Object>emptyMap(), createAttributes(conn), taskExecutor);
_connection = conn;
+ conn.addSessionListener(this);
}
private static Map<String, Object> createAttributes(final AMQConnectionModel conn)
@@ -146,33 +149,8 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter
public Collection<Session> getSessions()
{
- List<AMQSessionModel> actualSessions = _connection.getSessionModels();
-
synchronized (_sessionAdapters)
{
- Iterator<AMQSessionModel> iterator = _sessionAdapters.keySet().iterator();
- while(iterator.hasNext())
- {
- AMQSessionModel session = iterator.next();
- if(!actualSessions.contains(session))
- {
- SessionAdapter adapter = _sessionAdapters.get(session);
- iterator.remove();
-
- childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback.
- }
- }
-
- for(AMQSessionModel session : actualSessions)
- {
- if(!_sessionAdapters.containsKey(session))
- {
- SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor());
- _sessionAdapters.put(session, adapter);
- childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback.
- }
- }
-
return new ArrayList<Session>(_sessionAdapters.values());
}
}
@@ -186,7 +164,6 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter
{
synchronized (_sessionAdapters)
{
- getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions.
return _sessionAdapters.get(session);
}
}
@@ -359,4 +336,31 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter
{
return _connection.getSessionModels().size();
}
+
+ @Override
+ public void sessionAdded(final AMQSessionModel<?, ?> session)
+ {
+ synchronized (_sessionAdapters)
+ {
+ if(!_sessionAdapters.containsKey(session))
+ {
+ SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor());
+ _sessionAdapters.put(session, adapter);
+ childAdded(adapter);
+ }
+ }
+ }
+
+ @Override
+ public void sessionRemoved(final AMQSessionModel<?, ?> session)
+ {
+ synchronized (_sessionAdapters)
+ {
+ SessionAdapter adapter = _sessionAdapters.remove(session);
+ if(adapter != null)
+ {
+ childRemoved(adapter);
+ }
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 95e408ddf5..19d174a157 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -29,10 +29,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.*;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.queue.QueueConsumer;
final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
@@ -41,7 +42,6 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
private AMQSessionModel _session;
- private Map<Consumer, QueueConsumer> _consumerAdapters = new HashMap<Consumer, QueueConsumer>();
@ManagedAttributeField
private int _channelId;
@@ -50,6 +50,20 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
{
super(Collections.<String,Object>emptyMap(),createAttributes(session), taskExecutor);
_session = session;
+ _session.addConsumerListener(new ConsumerListener()
+ {
+ @Override
+ public void consumerAdded(final Consumer<?> consumer)
+ {
+ childAdded(consumer);
+ }
+
+ @Override
+ public void consumerRemoved(final Consumer<?> consumer)
+ {
+ childRemoved(consumer);
+ }
+ });
}
private static Map<String, Object> createAttributes(final AMQSessionModel session)
@@ -75,15 +89,12 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
- synchronized (_consumerAdapters)
- {
- return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
- }
+ return (Collection<Consumer>) _session.getConsumers();
}
public Collection<Publisher> getPublishers()
{
- return null; //TODO
+ return Collections.emptySet(); //TODO
}
public String setName(final String currentName, final String desiredName)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index d978705841..624e2cd280 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.transport.SessionListener;
import java.net.SocketAddress;
import java.security.Principal;
@@ -98,4 +99,8 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
String getVirtualHostName();
String getRemoteContainerName();
+
+ void addSessionListener(SessionModelListener listener);
+
+ void removeSessionListener(SessionModelListener listener);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index db92f6950d..e7c14a7a4d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.Deletable;
@@ -86,4 +88,10 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
int getChannelId();
int getConsumerCount();
+
+ Collection<Consumer<?>> getConsumers();
+
+ void addConsumerListener(ConsumerListener listener);
+
+ void removeConsumerListener(ConsumerListener listener);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java
new file mode 100644
index 0000000000..7d797d05b8
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.server.model.Consumer;
+
+public interface ConsumerListener
+{
+ void consumerAdded(Consumer<?> consumer);
+
+ void consumerRemoved(Consumer<?> consumer);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java
new file mode 100644
index 0000000000..80a32682e2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface SessionModelListener
+{
+ void sessionAdded(AMQSessionModel<?,?> session);
+ void sessionRemoved(AMQSessionModel<?,?> session);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0db7d78576..b758365039 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
@@ -224,4 +225,6 @@ public interface AMQQueue<X extends AMQQueue<X>>
long getUnacknowledgedMessages();
+
+ void bindingCreated(Binding<BindingImpl> binding);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 63dfdeb4b8..8e791cd326 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -36,6 +36,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageSource;
@@ -54,7 +55,6 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -99,6 +99,7 @@ public abstract class AbstractQueue
};
private final VirtualHost _virtualHost;
+ private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
/** null means shared */
private String _description;
@@ -682,7 +683,7 @@ public abstract class AbstractQueue
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<Consumer.Option> optionSet)
+ EnumSet<ConsumerImpl.Option> optionSet)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
@@ -762,13 +763,13 @@ public abstract class AbstractQueue
throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
}
- boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
- boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
+ boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(ConsumerImpl.Option.TRANSIENT);
- if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL))
+ if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL))
{
optionSet = EnumSet.copyOf(optionSet);
- optionSet.add(Consumer.Option.NO_LOCAL);
+ optionSet.add(ConsumerImpl.Option.NO_LOCAL);
}
if(exclusive && getConsumerCount() != 0)
@@ -822,6 +823,9 @@ public abstract class AbstractQueue
// TODO
}
+ childAdded(consumer);
+ consumer.addChangeListener(_deletedChildListener);
+
deliverAsync(consumer);
return consumer;
@@ -2598,7 +2602,7 @@ public abstract class AbstractQueue
case CONTAINER:
case CONNECTION:
AMQSessionModel session = null;
- for(Consumer c : getConsumers())
+ for(ConsumerImpl c : getConsumers())
{
if(session == null)
{
@@ -2624,7 +2628,7 @@ public abstract class AbstractQueue
case CONTAINER:
case PRINCIPAL:
AMQConnectionModel con = null;
- for(Consumer c : getConsumers())
+ for(ConsumerImpl c : getConsumers())
{
if(con == null)
{
@@ -2652,7 +2656,7 @@ public abstract class AbstractQueue
case NONE:
case PRINCIPAL:
String containerID = null;
- for(Consumer c : getConsumers())
+ for(ConsumerImpl c : getConsumers())
{
if(containerID == null)
{
@@ -2683,7 +2687,7 @@ public abstract class AbstractQueue
case NONE:
case CONTAINER:
Principal principal = null;
- for(Consumer c : getConsumers())
+ for(ConsumerImpl c : getConsumers())
{
if(principal == null)
{
@@ -3079,4 +3083,43 @@ public abstract class AbstractQueue
}
}
+ @Override
+ public void bindingCreated(final Binding<BindingImpl> binding)
+ {
+ childAdded(binding);
+ binding.addChangeListener(_deletedChildListener);
+ }
+
+ private class DeletedChildListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ AbstractQueue.this.childRemoved(object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
index 827d44dfb3..8220993e03 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index 3bc6cbd625..01594e68a9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index c64291faec..592e596a70 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
@@ -160,7 +160,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- Consumer assignedSub = group.getConsumer();
+ ConsumerImpl assignedSub = group.getConsumer();
if(assignedSub == sub)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 660d4ec5d6..5ffbc0dbaa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Consumer;
-public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer, org.apache.qpid.server.model.Consumer<X>
+public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, Consumer<X>
{
void flushBatched();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 1e7c8030c1..d984cf8ab4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index e3b56d25b7..b62e100eea 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
@@ -29,7 +30,6 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -201,7 +201,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
- public boolean acquire(Consumer sub)
+ public boolean acquire(ConsumerImpl sub)
{
final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
if(acquired)
@@ -217,7 +217,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(Consumer consumer)
+ public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
return state instanceof ConsumerAcquiredState
@@ -295,7 +295,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- public boolean isRejectedBy(Consumer consumer)
+ public boolean isRejectedBy(ConsumerImpl consumer)
{
if (_rejectedBy != null) // We have consumers that rejected this message
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 070b1990a6..d2687a33fd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.ServerMessage;
public interface QueueEntryList
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index ac0521b34e..dca3576827 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -21,7 +21,7 @@ package org.apache.qpid.server.security;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.*;
@@ -354,7 +354,7 @@ public class SecurityManager implements ConfigurationChangeListener
}
}
- public void authoriseCreateConsumer(final Consumer consumer)
+ public void authoriseCreateConsumer(final ConsumerImpl consumer)
{
// TODO - remove cast to AMQQueue and allow testing of consumption from any MessageSource
final AMQQueue queue = (AMQQueue) consumer.getMessageSource();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index eeb83a7148..0f5931b902 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -29,10 +29,13 @@ import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
@@ -41,9 +44,9 @@ import org.apache.qpid.server.util.StateChangeListener;
import java.net.SocketAddress;
import java.security.Principal;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -175,12 +178,12 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
@@ -336,6 +339,24 @@ public class MockConsumer implements ConsumerTarget
}
@Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+ return null;
+ }
+
+ @Override
+ public void addConsumerListener(final ConsumerListener listener)
+ {
+
+ }
+
+ @Override
+ public void removeConsumerListener(final ConsumerListener listener)
+ {
+
+ }
+
+ @Override
public void close(AMQConstant cause, String message)
{
}
@@ -475,6 +496,18 @@ public class MockConsumer implements ConsumerTarget
}
@Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+
+ }
+
+ @Override
public String getClientVersion()
{
return null;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 97365479fa..913f400a5f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -33,6 +33,7 @@ import java.util.*;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
@@ -46,7 +47,6 @@ import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.consumer.MockConsumer;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -158,8 +158,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
// Check adding a consumer adds it to the queue
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
@@ -195,8 +195,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
@@ -213,8 +213,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(messageA, null);
_queue.enqueue(messageB, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
@@ -234,8 +234,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -281,8 +281,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
ServerMessage messageA = createMessage(new Long(24));
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -333,8 +333,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
ServerMessage messageC = createMessage(new Long(26));
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -386,12 +386,12 @@ abstract class AbstractQueueTestBase extends QpidTestCase
QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -428,8 +428,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
// Check adding an exclusive consumer adds it to the queue
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -454,8 +454,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
{
_queue.addConsumer(subB, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
}
catch (MessageSource.ExistingExclusiveConsumer e)
@@ -468,14 +468,14 @@ abstract class AbstractQueueTestBase extends QpidTestCase
// existing consumer
_consumer.close();
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
try
{
_consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
- EnumSet.of(Consumer.Option.EXCLUSIVE));
+ EnumSet.of(ConsumerImpl.Option.EXCLUSIVE));
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
@@ -492,7 +492,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
ServerMessage message = createMessage(id);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
_queue.enqueue(message, new Action<MessageInstance>()
{
@@ -677,11 +677,11 @@ abstract class AbstractQueueTestBase extends QpidTestCase
// register the consumers
testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
//consumers during registration
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
index 4a762afbdb..3f5093a03a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.test.utils.QpidTestCase;
import static org.mockito.Mockito.mock;
@@ -146,7 +146,7 @@ public class ConsumerListTest extends QpidTestCase
* Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given
* Consumer, or null if none is found.
*/
- private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub)
+ private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final ConsumerImpl sub)
{
QueueConsumerList.ConsumerNode node = list.getHead();
while (node != null && node.getConsumer() != sub)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 7c89f9855c..74a2262265 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -60,7 +60,7 @@ public class MockMessageInstance implements MessageInstance
}
@Override
- public boolean isAcquiredBy(final Consumer consumer)
+ public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
}
@@ -76,7 +76,7 @@ public class MockMessageInstance implements MessageInstance
}
@Override
- public boolean acquire(final Consumer sub)
+ public boolean acquire(final ConsumerImpl sub)
{
return false;
}
@@ -86,7 +86,7 @@ public class MockMessageInstance implements MessageInstance
return false;
}
- public Consumer getDeliveredConsumer()
+ public ConsumerImpl getDeliveredConsumer()
{
return null;
}
@@ -116,7 +116,7 @@ public class MockMessageInstance implements MessageInstance
}
@Override
- public boolean isRejectedBy(final Consumer consumer)
+ public boolean isRejectedBy(final ConsumerImpl consumer)
{
return false;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index 9c8449ce5f..09cec61909 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import java.util.Collections;
import junit.framework.AssertionFailedError;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -31,7 +32,6 @@ import java.util.ArrayList;
import java.util.EnumSet;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.consumer.Consumer;
import static org.mockito.Mockito.when;
@@ -65,7 +65,7 @@ public class PriorityQueueTest extends AbstractQueueTestBase
queue.enqueue(createMessage(9L, (byte) 0), null);
// Register subscriber
- queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class));
+ queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class));
Thread.sleep(150);
ArrayList<MessageInstance> msgs = getConsumer().getMessages();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index 39d53637e4..c89d2abeae 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -24,7 +24,6 @@ import junit.framework.TestCase;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.consumer.Consumer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 4116be1954..1627de22f2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.message.MessageInstance;
@@ -57,8 +57,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
ServerMessage message = createMessage(25l);
QueueConsumer consumer =
(QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
getQueue().enqueue(message, null);
consumer.close();
@@ -84,8 +84,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify adding an inactive consumer doesn't increase the count
@@ -97,8 +97,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify behaviour in face of expected state changes:
@@ -151,8 +151,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
null,
createMessage(-1l).getClass(),
"test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
@@ -219,8 +219,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
null,
entries.get(0).getMessage().getClass(),
"test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -335,7 +335,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
}
@Override
- public boolean acquire(Consumer sub)
+ public boolean acquire(ConsumerImpl sub)
{
if(_message.getMessageNumber() % 2 == 0)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index eeafb30642..a3fabf076c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
@@ -31,7 +32,6 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -66,7 +66,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
public ConsumerTarget_0_10(ServerSession session,
@@ -90,7 +90,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public Consumer getConsumer()
+ public ConsumerImpl getConsumer()
{
return _consumer;
}
@@ -105,7 +105,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
boolean closed = false;
State state = getState();
- final Consumer consumer = getConsumer();
+ final ConsumerImpl consumer = getConsumer();
if(consumer != null)
{
consumer.getSendLock();
@@ -569,13 +569,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index a3a80415ac..5e899aa635 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -80,6 +81,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+ private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
+ new CopyOnWriteArrayList<SessionModelListener>();
+
private volatile boolean _stopped;
public ServerConnection(final long connectionId, Broker broker)
@@ -383,6 +387,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
public synchronized void registerSession(final Session ssn)
{
super.registerSession(ssn);
+ sessionAdded((ServerSession)ssn);
if(_blocking)
{
((ServerSession)ssn).block();
@@ -392,6 +397,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
@Override
public synchronized void removeSession(final Session ssn)
{
+ sessionRemoved((ServerSession)ssn);
super.removeSession(ssn);
}
@@ -552,6 +558,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.add(listener);
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.remove(listener);
+ }
+
+ private void sessionAdded(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionAdded(session);
+ }
+ }
+
+ private void sessionRemoved(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionRemoved(session);
+ }
+ }
+
+
+ @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index c2eacfe6e8..0bb3008d13 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
@@ -108,6 +114,9 @@ public class ServerSession extends Session
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
+ private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
+ private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+
public static interface MessageDispositionChangeListener
{
@@ -133,6 +142,7 @@ public class ServerSession extends Session
private final AtomicLong _txnCount = new AtomicLong(0);
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
+ private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
@@ -458,6 +468,18 @@ public class ServerSession extends Session
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
+
+ public void register(final ConsumerImpl consumerImpl)
+ {
+ if(consumerImpl instanceof Consumer<?>)
+ {
+ final Consumer<?> consumer = (Consumer<?>) consumerImpl;
+ _consumers.add(consumer);
+ consumer.addChangeListener(_consumerClosedListener);
+ consumerAdded(consumer);
+ }
+ }
+
public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
@@ -949,6 +971,41 @@ public class ServerSession extends Session
}
@Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+
+ return Collections.unmodifiableCollection(_consumers);
+ }
+
+ @Override
+ public void addConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.remove(listener);
+ }
+
+ private void consumerAdded(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerAdded(consumer);
+ }
+ }
+
+ private void consumerRemoved(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerRemoved(consumer);
+ }
+ }
+
+ @Override
public int compareTo(ServerSession o)
{
return getId().compareTo(o.getId());
@@ -966,4 +1023,37 @@ public class ServerSession extends Session
}
}
}
+
+ private class ConsumerClosedListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
+ {
+ if(newState == org.apache.qpid.server.model.State.DELETED)
+ {
+ consumerRemoved((Consumer<?>)object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 2593c66191..040be92ceb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -51,7 +52,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -234,25 +234,25 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession)session).register(destination, target);
try
{
- EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
{
- options.add(Consumer.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
}
if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
if(method.getExclusive())
{
- options.add(Consumer.Option.EXCLUSIVE);
+ options.add(ConsumerImpl.Option.EXCLUSIVE);
}
- Consumer sub =
+ ((ServerSession)session).register(
queue.addConsumer(target,
filterManager,
MessageTransferMessage.class,
destination,
- options);
+ options));
}
catch (AMQQueue.ExistingExclusiveConsumer existing)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 780e7ad199..baf5eceef7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -33,6 +33,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.Filterable;
@@ -66,7 +67,12 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -76,7 +82,6 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -173,6 +178,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
private Subject _subject;
+ private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+ private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
public AMQChannel(T session, int channelId, final MessageStore messageStore)
@@ -526,7 +534,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
- public Consumer getSubscription(AMQShortString tag)
+ public ConsumerImpl getSubscription(AMQShortString tag)
{
final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
return target == null ? null : target.getConsumer();
@@ -545,7 +553,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
- * @throws AMQException if something goes wrong
+ * @throws org.apache.qpid.AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
@@ -564,7 +572,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
ConsumerTarget_0_8 target;
- EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
@@ -573,19 +581,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
else if(acks)
{
target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
{
target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
if(exclusive)
{
- options.add(Consumer.Option.EXCLUSIVE);
+ options.add(ConsumerImpl.Option.EXCLUSIVE);
}
@@ -615,12 +623,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
});
}
- Consumer sub =
+ ConsumerImpl sub =
source.addConsumer(target,
filterManager,
- AMQMessage.class,
- AMQShortString.toString(tag),
- options);
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
+ if(sub instanceof Consumer<?>)
+ {
+ final Consumer<?> modelConsumer = (Consumer<?>) sub;
+ consumerAdded(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ _consumers.add(modelConsumer);
+ }
}
catch (AccessControlException e)
{
@@ -659,15 +674,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- Consumer sub = target == null ? null : target.getConsumer();
+ ConsumerImpl sub = target == null ? null : target.getConsumer();
if (sub != null)
{
sub.close();
+ if(sub instanceof Consumer<?>)
+ {
+ _consumers.remove(sub);
+ }
return true;
}
else
{
- _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+ _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered.");
}
return false;
}
@@ -735,7 +754,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Consumer sub = me.getValue().getConsumer();
+ ConsumerImpl sub = me.getValue().getConsumer();
if(sub != null)
{
@@ -754,7 +773,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
* delivery tag)
* @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer)
{
if (_logger.isDebugEnabled())
{
@@ -1126,7 +1145,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
for(MessageInstance entry : _resendList)
{
- Consumer sub = entry.getDeliveredConsumer();
+ ConsumerImpl sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
@@ -1199,7 +1218,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1658,4 +1677,71 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
return _tag2SubscriptionTargetMap.size();
}
+
+ @Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+ return Collections.unmodifiableCollection(_consumers);
+ }
+
+ private class ConsumerClosedListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ consumerRemoved((Consumer<?>)object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
+
+ private void consumerAdded(final Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerAdded(consumer);
+ }
+ }
+
+ private void consumerRemoved(final Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerRemoved(consumer);
+ }
+ }
+
+ @Override
+ public void addConsumerListener(ConsumerListener listener)
+ {
+ _consumerListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(ConsumerListener listener)
+ {
+ _consumerListeners.remove(listener);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 2ebcde199b..a86530fe0e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -56,10 +56,13 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
@@ -73,7 +76,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -102,6 +104,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
+ private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
+ new CopyOnWriteArrayList<SessionModelListener>();
@SuppressWarnings("unchecked")
private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
@@ -759,7 +763,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
synchronized (_channelMap)
{
_channelMap.put(channel.getChannelId(), channel);
-
+ sessionAdded(channel);
if(_blocking)
{
channel.block();
@@ -773,6 +777,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
+ private void sessionAdded(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionAdded(session);
+ }
+ }
+
+ private void sessionRemoved(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionRemoved(session);
+ }
+ }
+
public Long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
@@ -844,15 +864,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*/
public void removeChannel(int channelId)
{
+ AMQChannel<AMQProtocolEngine> session;
synchronized (_channelMap)
{
- _channelMap.remove(channelId);
-
+ session = _channelMap.remove(channelId);
if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
{
_cachedChannels[channelId] = null;
}
}
+ sessionRemoved(session);
}
/**
@@ -1509,6 +1530,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return String.valueOf(getContextKey());
}
+ @Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.add(listener);
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.remove(listener);
+ }
+
public void setDeferFlush(boolean deferFlush)
{
_deferFlush = deferFlush;
@@ -1525,7 +1558,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public void deliverToClient(final Consumer sub, final ServerMessage message,
+ public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
registerMessageDelivered(message.getSize());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index 9f8799f68e..fa26a73f93 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.consumer.Consumer;
public interface ClientDeliveryMethod
{
- void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props,
+ void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
final long deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 2ce8caefc9..3de89a1d70 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -25,17 +25,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -71,7 +70,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -368,18 +367,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- public Consumer getConsumer()
+ public ConsumerImpl getConsumer()
{
return _consumer;
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
}
@@ -428,7 +427,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
boolean closed = false;
State state = getState();
- final Consumer consumer = getConsumer();
+ final ConsumerImpl consumer = getConsumer();
if(consumer != null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 1de1638c2e..7a2fdb05fc 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -49,7 +49,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
{
message.setRedelivered();
- final Consumer consumer = message.getDeliveredConsumer();
+ final ConsumerImpl consumer = message.getDeliveredConsumer();
if (consumer != null)
{
// Consumer exists
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
index 70d7da3432..c13ff17f67 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.consumer.Consumer;
public interface RecordDeliveryMethod
{
- void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag);
+ void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index f620abf30f..76b5cbbbb9 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
@@ -44,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.AccessControlException;
@@ -150,15 +150,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
ConsumerTarget_0_8 target;
- EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
if(acks)
{
@@ -173,7 +173,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
return(getDeliveryMethod.hasDeliveredMessage());
@@ -202,7 +202,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
@Override
- public void deliverToClient(final Consumer sub, final ServerMessage message,
+ public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 8d7de4cd93..e5cfced4e2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -26,13 +26,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -49,7 +49,7 @@ import java.util.Set;
public class AckTest extends QpidTestCase
{
private ConsumerTarget_0_8 _subscriptionTarget;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
private AMQProtocolSession _protocolSession;
@@ -176,8 +176,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -209,8 +209,8 @@ public class AckTest extends QpidTestCase
null,
AMQMessage.class,
DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -232,7 +232,7 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -255,8 +255,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -292,8 +292,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -326,8 +326,8 @@ public class AckTest extends QpidTestCase
null,
new LimitlessCreditManager());
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -360,8 +360,8 @@ public class AckTest extends QpidTestCase
_subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
_consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Consumer.Option.SEES_REQUEUES,
- Consumer.Option.ACQUIRES));
+ EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
+ ConsumerImpl.Option.ACQUIRES));
final int msgCount = 1;
publishMessages(msgCount);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index f18da87d09..6d3e648369 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol.v0_8;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.consumer.Consumer;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -63,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue;
private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
private boolean _queueDeleted;
@Override
@@ -74,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase
_queue = mock(AMQQueue.class);
when(_queue.getName()).thenReturn(getName());
when(_queue.isDeleted()).thenReturn(_queueDeleted);
- _consumer = mock(Consumer.class);
- when(_consumer.getConsumerNumber()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
+ _consumer = mock(ConsumerImpl.class);
+ when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
long id = 0;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index eaa5b6a7a5..18949bba50 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
@@ -48,7 +49,6 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -244,7 +244,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
@Override
- public void deliverToClient(Consumer sub, ServerMessage message,
+ public void deliverToClient(ConsumerImpl sub, ServerMessage message,
InstanceProperties props, long deliveryTag)
{
_deliveryCount.incrementAndGet();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 0a53a6436a..00c78581e1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -40,6 +40,8 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -51,6 +53,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
@@ -68,6 +71,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final Object _reference = new Object();
private final Subject _subject = new Subject();
+ private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
+ new CopyOnWriteArrayList<SessionModelListener>();
private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
@@ -111,7 +116,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_connectionId = connectionId;
_subject.getPrincipals().add(new ConnectionPrincipal(this));
_subjectCreator = subjectCreator;
- //_vhost.getConnectionRegistry().registerConnection(this);
}
@@ -129,6 +133,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
}
_vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
+ _vhost.getConnectionRegistry().registerConnection(this);
+
if(_vhost == null)
{
final Error err = new Error();
@@ -147,6 +153,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
final Session_1_0 session = new Session_1_0(this, endpoint);
_sessions.add(session);
+ sessionAdded(session);
endpoint.setSessionEventListener(new SessionEventListener()
{
@Override
@@ -182,6 +189,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
void sessionEnded(Session_1_0 session)
{
_sessions.remove(session);
+ sessionRemoved(session);
}
public void removeDeleteTask(final Action<? super Connection_1_0> task)
@@ -428,4 +436,35 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
return _vhost;
}
+
+
+ @Override
+ public void addSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.add(listener);
+ }
+
+ @Override
+ public void removeSessionListener(final SessionModelListener listener)
+ {
+ _sessionListeners.remove(listener);
+ }
+
+ private void sessionAdded(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionAdded(session);
+ }
+ }
+
+ private void sessionRemoved(final AMQSessionModel<?,?> session)
+ {
+ for(SessionModelListener l : _sessionListeners)
+ {
+ l.sessionRemoved(session);
+ }
+ }
+
+
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index f3417710a5..adb2f8ea6a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -37,13 +37,13 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -60,7 +60,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
public ConsumerTarget_1_0(final SendingLink_1_0 link,
boolean acquires)
@@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
_acquires = acquires;
}
- public Consumer getConsumer()
+ public ConsumerImpl getConsumer()
{
return _consumer;
}
@@ -498,13 +498,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
@Override
- public void consumerAdded(final Consumer sub)
+ public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
}
@Override
- public void consumerRemoved(final Consumer sub)
+ public void consumerRemoved(final ConsumerImpl sub)
{
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 24395a6fad..eb1f75b771 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -55,7 +56,6 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -69,7 +69,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private VirtualHost _vhost;
private SendingDestination _destination;
- private Consumer _consumer;
+ private ConsumerImpl _consumer;
private ConsumerTarget_1_0 _target;
private boolean _draining;
@@ -99,7 +99,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
boolean noLocal = false;
@@ -163,8 +163,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
if(source.getDistributionMode() != StdDistMode.COPY)
{
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
}
@@ -318,8 +318,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_target = new ConsumerTarget_1_0(this, true);
- options.add(Consumer.Option.ACQUIRES);
- options.add(Consumer.Option.SEES_REQUEUES);
+ options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
@@ -331,7 +331,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
if(noLocal)
{
- options.add(Consumer.Option.NO_LOCAL);
+ options.add(ConsumerImpl.Option.NO_LOCAL);
}
try
@@ -372,7 +372,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
public void resume(SendingLinkAttachment linkAttachment)
{
_linkAttachment = linkAttachment;
-
}
public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
@@ -692,4 +691,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
return _vhost;
}
+
+ public ConsumerImpl getConsumer()
+ {
+ return _consumer;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 411117be4d..e124b4d5ac 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.*;
import org.apache.qpid.protocol.AMQConstant;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -85,6 +87,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private AtomicBoolean _closed = new AtomicBoolean();
private final Subject _subject = new Subject();
+ private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+ private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+ private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -184,6 +189,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
);
sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
+ registerConsumer(sendingLink.getConsumer());
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
@@ -383,6 +389,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
}
}
+ private void registerConsumer(final ConsumerImpl consumer)
+ {
+ if(consumer instanceof Consumer<?>)
+ {
+ Consumer<?> modelConsumer = (Consumer<?>) consumer;
+ _consumers.add(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ consumerAdded(modelConsumer);
+ }
+ }
+
private AMQQueue createTemporaryQueue(Map properties)
{
@@ -653,11 +670,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
@Override
public int getConsumerCount()
{
- // TODO
- return 0;
+ return getConsumers().size();
}
+
public String toLogString()
{
long connectionId = getConnectionModel().getConnectionId();
@@ -785,4 +802,72 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
});
}
}
+
+
+ @Override
+ public Collection<Consumer<?>> getConsumers()
+ {
+ return Collections.unmodifiableCollection(_consumers);
+ }
+
+ @Override
+ public void addConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerListener(final ConsumerListener listener)
+ {
+ _consumerListeners.remove(listener);
+ }
+
+ private void consumerAdded(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerAdded(consumer);
+ }
+ }
+
+ private void consumerRemoved(Consumer<?> consumer)
+ {
+ for(ConsumerListener l : _consumerListeners)
+ {
+ l.consumerRemoved(consumer);
+ }
+ }
+
+ private class ConsumerClosedListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
+ {
+ if(newState == org.apache.qpid.server.model.State.DELETED)
+ {
+ consumerRemoved((Consumer<?>)object);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index a47506f804..788ce63c8f 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
@@ -949,7 +949,7 @@ class ManagementNode implements MessageSource, MessageDestination
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<Consumer.Option> options)
+ final EnumSet<ConsumerImpl.Option> options)
{
final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
@@ -1054,7 +1054,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean isAcquiredBy(final Consumer consumer)
+ public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
}
@@ -1072,7 +1072,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public Consumer getDeliveredConsumer()
+ public ConsumerImpl getDeliveredConsumer()
{
return null;
}
@@ -1084,7 +1084,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean isRejectedBy(final Consumer consumer)
+ public boolean isRejectedBy(final ConsumerImpl consumer)
{
return false;
}
@@ -1102,7 +1102,7 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
- public boolean acquire(final Consumer sub)
+ public boolean acquire(final ConsumerImpl sub)
{
return false;
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 8a1f39fdfe..a3b1f932ac 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -33,9 +33,9 @@ import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-class ManagementNodeConsumer implements Consumer
+class ManagementNodeConsumer implements ConsumerImpl
{
- private final long _id = Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
+ private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 18c68bd198..ae2828d392 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +84,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public boolean isAcquiredBy(final Consumer consumer)
+ public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();
}
@@ -114,7 +114,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public boolean isRejectedBy(final Consumer consumer)
+ public boolean isRejectedBy(final ConsumerImpl consumer)
{
return false;
}
@@ -132,7 +132,7 @@ class ManagementResponse implements MessageInstance
}
@Override
- public boolean acquire(final Consumer sub)
+ public boolean acquire(final ConsumerImpl sub)
{
return false;
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
index baf92e8522..0947ae2a89 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -40,7 +41,6 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.consumer.Consumer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -327,7 +327,7 @@ public class MessageServlet extends AbstractServlet
: entry.isAcquired()
? "Acquired"
: "");
- final Consumer deliveredConsumer = entry.getDeliveredConsumer();
+ final ConsumerImpl deliveredConsumer = entry.getDeliveredConsumer();
object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getConsumerNumber());
ServerMessage message = entry.getMessage();