diff options
author | Fraser Adams <fadams@apache.org> | 2013-04-26 14:25:52 +0000 |
---|---|---|
committer | Fraser Adams <fadams@apache.org> | 2013-04-26 14:25:52 +0000 |
commit | cf474845241b0c206710dbd9c87fe2e752c512a0 (patch) | |
tree | c2d6b10b6c84d99efd468f9e72a6c186203d9d79 | |
parent | 393d974b276c99138a46437da0bf57189e3fc730 (diff) | |
download | qpid-python-cf474845241b0c206710dbd9c87fe2e752c512a0.tar.gz |
QPID-4760: Associate Java Broker QueueAdapter and SessionAdapter via ConsumerAdapter
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476219 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 155 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 84f99e1f17..2ecd9a6431 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -68,20 +68,37 @@ final class ConnectionAdapter extends AbstractAdapter implements Connection { if(!actualSessions.contains(session)) { - _sessionAdapters.remove(session); + SessionAdapter adapter = _sessionAdapters.remove(session); + childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback. } } for(AMQSessionModel session : actualSessions) { if(!_sessionAdapters.containsKey(session)) { - _sessionAdapters.put(session, new SessionAdapter(session, getTaskExecutor())); + SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); + _sessionAdapters.put(session, adapter); + childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback. } } return new ArrayList<Session>(_sessionAdapters.values()); } } + /** + * Retrieve the SessionAdapter instance keyed by the AMQSessionModel from this Connection. + * @param session the AMQSessionModel used to index the SessionAdapter. + * @return the requested SessionAdapter. + */ + SessionAdapter getSessionAdapter(AMQSessionModel session) + { + synchronized (_sessionAdapters) + { + getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions. + return _sessionAdapters.get(session); + } + } + public void delete() { try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index e6d3fab2f8..4633605256 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -37,9 +37,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer { private final Subscription _subscription; private final QueueAdapter _queue; + private final SessionAdapter _session; private final ConsumerStatistics _statistics; - public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription subscription) + public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter, + final Subscription subscription) { super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(), queueAdapter.getName(), @@ -48,6 +50,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer subscription.getConsumerName()), queueAdapter.getTaskExecutor()); _subscription = subscription; _queue = queueAdapter; + _session = sessionAdapter; _statistics = new ConsumerStatistics(); //TODO } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index f3ddf32e5a..8ac869900c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -34,13 +34,17 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFinder; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.MapValueConverter; @@ -91,6 +95,38 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs _queue.setNotificationListener(this); } + /** + * Helper method to retrieve the SessionAdapter keyed by the AMQSessionModel. + * This method first finds the ConnectionAdapter associated with the Session from this QueueAdapter's parent vhost + * then it does a lookup on that to find the SessionAdapter keyed by the requested AMQSessionModel instance. + * @param session the AMQSessionModel used to index the SessionAdapter. + * @return the requested SessionAdapter or null if it can't be found. + */ + private SessionAdapter getSessionAdapter(AMQSessionModel session) + { + // Retrieve the ConnectionModel associated with the SessionModel as a key to lookup the ConnectionAdapter. + AMQConnectionModel connectionKey = session.getConnectionModel(); + + // Lookup the ConnectionAdapter, from which we should be able to retrieve the SessionAdapter we really want. + ConnectionAdapter connectionAdapter = _vhost.getConnectionAdapter(connectionKey); + if (connectionAdapter == null) + { + return null; // If we can't find an associated ConnectionAdapter the SessionAdapter is a lost cause. + } + else + { // With a good ConnectionAdapter we can finally try to find the SessionAdapter we are actually looking for. + SessionAdapter sessionAdapter = connectionAdapter.getSessionAdapter(session); + if (sessionAdapter == null) + { + return null; // If the SessionAdapter isn't associated with the selected ConnectionAdapter give up. + } + else + { + return sessionAdapter; + } + } + } + private void populateConsumers() { Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers(); @@ -102,7 +138,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { if(!_consumerAdapters.containsKey(subscription)) { - _consumerAdapters.put(subscription, new ConsumerAdapter(this, subscription)); + SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription); + _consumerAdapters.put(subscription, adapter); + if (sessionAdapter != null) + { // Register ConsumerAdapter with the SessionAdapter. + sessionAdapter.subscriptionRegistered(subscription, adapter); + } } } } @@ -571,9 +613,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { if(!_consumerAdapters.containsKey(subscription)) { - adapter = new ConsumerAdapter(this, subscription); - _consumerAdapters.put(subscription,adapter); - // TODO - register with session + SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + adapter = new ConsumerAdapter(this, sessionAdapter, subscription); + _consumerAdapters.put(subscription, adapter); + if (sessionAdapter != null) + { // Register ConsumerAdapter with the SessionAdapter. + sessionAdapter.subscriptionRegistered(subscription, adapter); + } } } if(adapter != null) @@ -589,10 +635,14 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs synchronized (_consumerAdapters) { adapter = _consumerAdapters.remove(subscription); - // TODO - register with session } if(adapter != null) { + SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + if (sessionAdapter != null) + { // Unregister ConsumerAdapter with the SessionAdapter. + sessionAdapter.subscriptionUnregistered(subscription); + } childRemoved(adapter); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 2fffdb32f8..550e8cecd6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -21,8 +21,10 @@ package org.apache.qpid.server.model.adapter; import java.security.AccessControlException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -34,6 +36,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -44,6 +47,7 @@ final class SessionAdapter extends AbstractAdapter implements Session private AMQSessionModel _session; private SessionStatistics _statistics; + private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>(); public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor) { @@ -54,7 +58,10 @@ final class SessionAdapter extends AbstractAdapter implements Session public Collection<Consumer> getSubscriptions() { - return null; //TODO + synchronized (_consumerAdapters) + { + return new ArrayList<Consumer>(_consumerAdapters.values()); + } } public Collection<Publisher> getPublishers() @@ -111,6 +118,37 @@ final class SessionAdapter extends AbstractAdapter implements Session return 0; //TODO } + /** + * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription. + * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter. + * @param adapter the registered ConsumerAdapter. + */ + void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter) + { + synchronized (_consumerAdapters) + { + _consumerAdapters.put(subscription, adapter); + } + childAdded(adapter); + } + + /** + * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription. + * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter. + */ + void subscriptionUnregistered(Subscription subscription) + { + ConsumerAdapter adapter = null; + synchronized (_consumerAdapters) + { + adapter = _consumerAdapters.remove(subscription); + } + if (adapter != null) + { + childRemoved(adapter); + } + } + @Override public Collection<String> getAttributeNames() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 2a66763272..97bb492484 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -214,6 +214,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } + /** + * Retrieve the ConnectionAdapter instance keyed by the AMQConnectionModel from this VirtualHost. + * @param connection the AMQConnectionModel used to index the ConnectionAdapter. + * @return the requested ConnectionAdapter. + */ + ConnectionAdapter getConnectionAdapter(AMQConnectionModel connection) + { + synchronized (_connectionAdapters) + { + return _connectionAdapters.get(connection); + } + } + public Collection<Queue> getQueues() { synchronized(_queueAdapters) @@ -644,6 +657,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(adapter != null) { + // Call getSessions() first to ensure that any SessionAdapter children are cleanly removed and any + // corresponding ConfigurationChangeListener childRemoved() callback is called for child SessionAdapters. + adapter.getSessions(); + childRemoved(adapter); } } diff --git a/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java b/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java index 43b6bd9a1d..48c59ba476 100644 --- a/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java +++ b/qpid/tools/src/java/src/qpid-broker-plugins-management-qmf2/java/org/apache/qpid/server/qmf2/QmfManagementAgent.java @@ -169,6 +169,8 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getSubscribeSchema()); _agent.registerEventClass(org.apache.qpid.server.qmf2.agentdata.Subscription.getUnsubscribeSchema()); + _agent.registerObjectClass(org.apache.qpid.server.qmf2.agentdata.Session.getSchema()); + // Initialise QmfAgentData Objects and track changes to the broker Management Objects. registerConfigurationChangeListeners(); } @@ -176,10 +178,12 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent catch (QmfException qmfe) { _log.info("QmfException {} caught in QmfManagementAgent Constructor", qmfe.getMessage()); + _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message. } catch (Exception e) { _log.info("Exception {} caught in QmfManagementAgent Constructor", e.getMessage()); + _agent = null; // Causes isConnected() to be false and thus prevents the "QMF2 Management Ready" message. } } @@ -214,7 +218,7 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent for (VirtualHost vhost : _broker.getVirtualHosts()) { // We don't add QmfAgentData VirtualHost objects. Possibly TODO, but it's a bit awkward at the moment - // becase (as of Qpid 0.20) the C++ broker doesn't *seem* to do much with them and the command line + // because (as of Qpid 0.20) the C++ broker doesn't *seem* to do much with them and the command line // tools such as qpid-config don't appear to be VirtualHost aware. A way to stay compatible is to // mark queues, exchanges etc with [vhost:<vhost-name>/]<object-name> (see Constructor comments). vhost.addChangeListener(this); @@ -227,7 +231,6 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent { childAdded(connection, session); - // session.getSubscriptions() returns null in Qpid 0.23 TODO fix that. if (session.getSubscriptions() != null) { for (Consumer subscription : session.getSubscriptions()) @@ -330,7 +333,7 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent * QMF2 Management Object if one doesn't already exist. In most cases it's a one-to-one mapping, but for * Binding for example the Binding child is added to both Queue and Exchange so we only create the Binding * QMF2 Management Object once and add the queueRef and exchangeRef reference properties referencing the Queue - * and Exchange parent Objects respectively. + * and Exchange parent Objects respectively, Similarly for Consumer (AKA Subscription). * <p> * This method is also responsible for raising the appropriate QMF2 Events when Management Objects are created. * @param object the parent object that the child is being added to. @@ -362,8 +365,16 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent agentConnection = false; // Only ignore the first Connection, which is the one from the Agent. } else if (child instanceof Session) - { // TODO - + { + if (!_objects.containsKey(child)) + { + QmfAgentData ref = _objects.get(object); // Get the Connection QmfAgentData so we can get connectionRef. + if (ref != null) + { + data = new org.apache.qpid.server.qmf2.agentdata.Session((Session)child, ref.getObjectId()); + _objects.put(child, data); + } + } } else if (child instanceof Exchange) { @@ -449,17 +460,15 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent { subscription.setQueueRef(ref.getObjectId(), (Queue)object); // Raise a Subscribe Event - N.B. Need to do it *after* we've set the queueRef. - _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Subscription)data).createSubscribeEvent()); + _agent.raiseEvent(subscription.createSubscribeEvent()); } - else if (object instanceof Session) // Won't get called in Qpid 0.20. - { // TODO the association between Session and Subscription isn't implemented in the 0.20 Java Broker. - //System.out.println("subscription.setSessionRef"); + else if (object instanceof Session) + { subscription.setSessionRef(ref.getObjectId()); } } } - try { // If we've created new QmfAgentData we register it with the Agent. @@ -503,8 +512,8 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent _agent.raiseEvent(((org.apache.qpid.server.qmf2.agentdata.Connection)data).createClientDisconnectEvent()); } else if (child instanceof Session) - { // TODO - + { + // no-op, don't need to do anything specific when Session is removed. } else if (child instanceof Exchange) { @@ -588,5 +597,4 @@ public class QmfManagementAgent implements ConfigurationChangeListener, QmfEvent } } } - } |