diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-08 15:24:30 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-08 15:24:30 +0000 |
| commit | ea0c222a318baf1cd71eb363f1244e04ca3c74de (patch) | |
| tree | 4af3254114f350d7ea80751b63ae599ccd622b17 /qpid/cpp/src | |
| parent | ba3b6c53f4072744aecbac429c8eab66631d84c6 (diff) | |
| download | qpid-python-ea0c222a318baf1cd71eb363f1244e04ca3c74de.tar.gz | |
QPID-3603: Add ConfigurationObserver.
Allows plugins to observe configuration events queue create/destroy,
exchange create/destroy and bind/unbind.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348114 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConfigurationObserver.h | 61 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConfigurationObservers.h | 72 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionObservers.h | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Observers.h | 69 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 3 |
10 files changed, 230 insertions, 29 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 63c8b0e1e2..d021843f16 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -549,6 +549,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ConsumerFactory.h \ qpid/broker/ConnectionObserver.h \ qpid/broker/ConnectionObservers.h \ + qpid/broker/ConfigurationObserver.h \ + qpid/broker/ConfigurationObservers.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ qpid/broker/Deliverable.h \ @@ -611,6 +613,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ + qpid/broker/Observers.h \ qpid/broker/OwnershipToken.h \ qpid/broker/Persistable.h \ qpid/broker/PersistableConfig.h \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index dd4baf9992..6745204043 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1219,6 +1219,7 @@ void Broker::bind(const std::string& queueName, throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); } else { if (queue->bind(exchange, key, arguments)) { + getConfigurationObservers().bind(exchange, queue, key, arguments); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, queueName, key, ManagementAgent::toMap(arguments))); @@ -1254,6 +1255,8 @@ void Broker::unbind(const std::string& queueName, if (exchange->isDurable() && queue->isDurable()) { store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); } + getConfigurationObservers().unbind( + exchange, queue, key, framing::FieldTable()); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 7095383959..22a35c0929 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -39,6 +39,7 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" +#include "qpid/broker/ConfigurationObservers.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -184,6 +185,7 @@ class Broker : public sys::Runnable, public Plugin::Target, AclModule* acl; DataDir dataDir; ConnectionObservers connectionObservers; + ConfigurationObservers configurationObservers; QueueRegistry queues; ExchangeRegistry exchanges; @@ -210,7 +212,6 @@ class Broker : public sys::Runnable, public Plugin::Target, mutable sys::Mutex linkClientPropertiesLock; framing::FieldTable linkClientProperties; - public: QPID_BROKER_EXTERN virtual ~Broker(); @@ -381,6 +382,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } + ConfigurationObservers& getConfigurationObservers() { return configurationObservers; } /** Properties to be set on outgoing link connections */ QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; diff --git a/qpid/cpp/src/qpid/broker/ConfigurationObserver.h b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h new file mode 100644 index 0000000000..701043db40 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h @@ -0,0 +1,61 @@ +#ifndef QPID_BROKER_CONFIGURATIONOBSERVER_H +#define QPID_BROKER_CONFIGURATIONOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include <string> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { +class Queue; +class Exchange; + + +/** + * Observer for changes to configuration (aka wiring) + */ +class ConfigurationObserver +{ + public: + virtual ~ConfigurationObserver() {} + virtual void queueCreate(const boost::shared_ptr<Queue>&) {} + virtual void queueDestroy(const boost::shared_ptr<Queue>&) {} + virtual void exchangeCreate(const boost::shared_ptr<Exchange>&) {} + virtual void exchangeDestroy(const boost::shared_ptr<Exchange>&) {} + virtual void bind(const boost::shared_ptr<Exchange>& , + const boost::shared_ptr<Queue>& , + const std::string& /*key*/, + const framing::FieldTable& /*args*/) {} + virtual void unbind(const boost::shared_ptr<Exchange>&, + const boost::shared_ptr<Queue>& , + const std::string& /*key*/, + const framing::FieldTable& /*args*/) {} +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONFIGURATIONOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/broker/ConfigurationObservers.h b/qpid/cpp/src/qpid/broker/ConfigurationObservers.h new file mode 100644 index 0000000000..4c1159747d --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ConfigurationObservers.h @@ -0,0 +1,72 @@ +#ifndef QPID_BROKER_CONFIGURATIONOBSERVERS_H +#define QPID_BROKER_CONFIGURATIONOBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConfigurationObserver.h" +#include "Observers.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace broker { + +/** + * A configuration observer that delegates to a collection of + * configuration observers. + * + * THREAD SAFE + */ +class ConfigurationObservers : public ConfigurationObserver, + public Observers<ConfigurationObserver> +{ + public: + void queueCreate(const boost::shared_ptr<Queue>& q) { + each(boost::bind(&ConfigurationObserver::queueCreate, _1, q)); + } + void queueDestroy(const boost::shared_ptr<Queue>& q) { + each(boost::bind(&ConfigurationObserver::queueDestroy, _1, q)); + } + void exchangeCreate(const boost::shared_ptr<Exchange>& e) { + each(boost::bind(&ConfigurationObserver::exchangeCreate, _1, e)); + } + void exchangeDestroy(const boost::shared_ptr<Exchange>& e) { + each(boost::bind(&ConfigurationObserver::exchangeDestroy, _1, e)); + } + void bind(const boost::shared_ptr<Exchange>& exchange, + const boost::shared_ptr<Queue>& queue, + const std::string& key, + const framing::FieldTable& args) { + each(boost::bind( + &ConfigurationObserver::bind, _1, exchange, queue, key, args)); + } + void unbind(const boost::shared_ptr<Exchange>& exchange, + const boost::shared_ptr<Queue>& queue, + const std::string& key, + const framing::FieldTable& args) { + each(boost::bind( + &ConfigurationObserver::unbind, _1, exchange, queue, key, args)); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONFIGURATIONOBSERVERS_H*/ diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h index 07e515f3c9..e9014c80c3 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h +++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h @@ -23,9 +23,7 @@ */ #include "ConnectionObserver.h" -#include "qpid/sys/Mutex.h" -#include <set> -#include <algorithm> +#include "Observers.h" namespace qpid { namespace broker { @@ -35,18 +33,10 @@ namespace broker { * Calling a ConnectionObserver function will call that function on each observer. * THREAD SAFE. */ -class ConnectionObservers : public ConnectionObserver { +class ConnectionObservers : public ConnectionObserver, + public Observers<ConnectionObserver> +{ public: - void add(boost::shared_ptr<ConnectionObserver> observer) { - sys::Mutex::ScopedLock l(lock); - observers.insert(observer); - } - - void remove(boost::shared_ptr<ConnectionObserver> observer) { - sys::Mutex::ScopedLock l(lock); - observers.erase(observer); - } - void connection(Connection& c) { each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c))); } @@ -62,16 +52,6 @@ class ConnectionObservers : public ConnectionObserver { void forced(Connection& c, const std::string& text) { each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text)); } - - private: - typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers; - sys::Mutex lock; - Observers observers; - - template <class F> void each(F f) { - sys::Mutex::ScopedLock l(lock); - std::for_each(observers.begin(), observers.end(), f); - } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 43d7268dfb..dde59d41c1 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -19,6 +19,7 @@ * */ +#include "qpid/broker/Broker.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" @@ -69,6 +70,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c exchange = i->second(name, durable, args, parent, broker); } } + if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); exchanges[name] = exchange; return std::pair<Exchange::shared_ptr, bool>(exchange, true); } else { @@ -85,8 +87,10 @@ void ExchangeRegistry::destroy(const string& name){ RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i != exchanges.end()) { + Exchange::shared_ptr ex = i->second; i->second->destroy(); exchanges.erase(i); + if (broker) broker->getConfigurationObservers().exchangeDestroy(ex); } } diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h new file mode 100644 index 0000000000..c62f75d6d0 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Observers.h @@ -0,0 +1,69 @@ +#ifndef QPID_BROKER_OBSERVERS_H +#define QPID_BROKER_OBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> +#include <vector> +#include <algorithm> + +namespace qpid { +namespace broker { + +/** + * Base class for collections of observers with thread-safe add/remove and traversal. + */ +template <class Observer> +class Observers +{ + public: + void add(boost::shared_ptr<Observer> observer) { + sys::Mutex::ScopedLock l(lock); + observers.push_back(observer); + } + + void remove(boost::shared_ptr<Observer> observer) { + sys::Mutex::ScopedLock l(lock); + typename List::iterator i = std::find(observers.begin(), observers.end(), observer); + observers.erase(i); + } + + protected: + typedef std::vector<boost::shared_ptr<Observer> > List; + + sys::Mutex lock; + List observers; + + template <class F> void each(F f) { + List copy; + { + sys::Mutex::ScopedLock l(lock); + copy = observers; + } + std::for_each(copy.begin(), copy.end(), f); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_OBSERVERS_H*/ diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 236d5ae34c..6647774168 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueEvents.h" @@ -64,17 +65,22 @@ QueueRegistry::declare(const string& declareName, bool durable, //i.e. recovering a queue for which we already have a persistent record queue->configure(arguments); } + if (broker) broker->getConfigurationObservers().queueCreate(queue); queues[name] = queue; if (lastNode) queue->setLastNodeFailure(); - return std::pair<Queue::shared_ptr, bool>(queue, true); } else { return std::pair<Queue::shared_ptr, bool>(i->second, false); } } -void QueueRegistry::destroyLH (const string& name){ - queues.erase(name); +void QueueRegistry::destroyLH (const string& name) { + QueueMap::iterator i = queues.find(name); + if (i != queues.end()) { + Queue::shared_ptr q = i->second; + queues.erase(i); + if (broker) broker->getConfigurationObservers().queueDestroy(q); + } } void QueueRegistry::destroy (const string& name){ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 021e4d559e..6095837cd6 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -94,10 +94,11 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); statusChanged(l); - QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo); + QPID_LOG(notice, logPrefix << "Broker starting: " << brokerInfo); } HaBroker::~HaBroker() { + QPID_LOG(debug, logPrefix << "Broker shut down: " << brokerInfo); broker.getConnectionObservers().remove(excluder); } |
