summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-08 15:24:30 +0000
committerAlan Conway <aconway@apache.org>2012-06-08 15:24:30 +0000
commitea0c222a318baf1cd71eb363f1244e04ca3c74de (patch)
tree4af3254114f350d7ea80751b63ae599ccd622b17 /qpid/cpp/src
parentba3b6c53f4072744aecbac429c8eab66631d84c6 (diff)
downloadqpid-python-ea0c222a318baf1cd71eb363f1244e04ca3c74de.tar.gz
QPID-3603: Add ConfigurationObserver.
Allows plugins to observe configuration events queue create/destroy, exchange create/destroy and bind/unbind. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348114 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h4
-rw-r--r--qpid/cpp/src/qpid/broker/ConfigurationObserver.h61
-rw-r--r--qpid/cpp/src/qpid/broker/ConfigurationObservers.h72
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionObservers.h28
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Observers.h69
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp3
10 files changed, 230 insertions, 29 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 63c8b0e1e2..d021843f16 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -549,6 +549,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ConsumerFactory.h \
qpid/broker/ConnectionObserver.h \
qpid/broker/ConnectionObservers.h \
+ qpid/broker/ConfigurationObserver.h \
+ qpid/broker/ConfigurationObservers.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
qpid/broker/Deliverable.h \
@@ -611,6 +613,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
+ qpid/broker/Observers.h \
qpid/broker/OwnershipToken.h \
qpid/broker/Persistable.h \
qpid/broker/PersistableConfig.h \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index dd4baf9992..6745204043 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1219,6 +1219,7 @@ void Broker::bind(const std::string& queueName,
throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
} else {
if (queue->bind(exchange, key, arguments)) {
+ getConfigurationObservers().bind(exchange, queue, key, arguments);
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
queueName, key, ManagementAgent::toMap(arguments)));
@@ -1254,6 +1255,8 @@ void Broker::unbind(const std::string& queueName,
if (exchange->isDurable() && queue->isDurable()) {
store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
}
+ getConfigurationObservers().unbind(
+ exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 7095383959..22a35c0929 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -39,6 +39,7 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/ConfigurationObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -184,6 +185,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
AclModule* acl;
DataDir dataDir;
ConnectionObservers connectionObservers;
+ ConfigurationObservers configurationObservers;
QueueRegistry queues;
ExchangeRegistry exchanges;
@@ -210,7 +212,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
-
public:
QPID_BROKER_EXTERN virtual ~Broker();
@@ -381,6 +382,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ ConfigurationObservers& getConfigurationObservers() { return configurationObservers; }
/** Properties to be set on outgoing link connections */
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
diff --git a/qpid/cpp/src/qpid/broker/ConfigurationObserver.h b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
new file mode 100644
index 0000000000..701043db40
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_CONFIGURATIONOBSERVER_H
+#define QPID_BROKER_CONFIGURATIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+class Queue;
+class Exchange;
+
+
+/**
+ * Observer for changes to configuration (aka wiring)
+ */
+class ConfigurationObserver
+{
+ public:
+ virtual ~ConfigurationObserver() {}
+ virtual void queueCreate(const boost::shared_ptr<Queue>&) {}
+ virtual void queueDestroy(const boost::shared_ptr<Queue>&) {}
+ virtual void exchangeCreate(const boost::shared_ptr<Exchange>&) {}
+ virtual void exchangeDestroy(const boost::shared_ptr<Exchange>&) {}
+ virtual void bind(const boost::shared_ptr<Exchange>& ,
+ const boost::shared_ptr<Queue>& ,
+ const std::string& /*key*/,
+ const framing::FieldTable& /*args*/) {}
+ virtual void unbind(const boost::shared_ptr<Exchange>&,
+ const boost::shared_ptr<Queue>& ,
+ const std::string& /*key*/,
+ const framing::FieldTable& /*args*/) {}
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONFIGURATIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/broker/ConfigurationObservers.h b/qpid/cpp/src/qpid/broker/ConfigurationObservers.h
new file mode 100644
index 0000000000..4c1159747d
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ConfigurationObservers.h
@@ -0,0 +1,72 @@
+#ifndef QPID_BROKER_CONFIGURATIONOBSERVERS_H
+#define QPID_BROKER_CONFIGURATIONOBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConfigurationObserver.h"
+#include "Observers.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A configuration observer that delegates to a collection of
+ * configuration observers.
+ *
+ * THREAD SAFE
+ */
+class ConfigurationObservers : public ConfigurationObserver,
+ public Observers<ConfigurationObserver>
+{
+ public:
+ void queueCreate(const boost::shared_ptr<Queue>& q) {
+ each(boost::bind(&ConfigurationObserver::queueCreate, _1, q));
+ }
+ void queueDestroy(const boost::shared_ptr<Queue>& q) {
+ each(boost::bind(&ConfigurationObserver::queueDestroy, _1, q));
+ }
+ void exchangeCreate(const boost::shared_ptr<Exchange>& e) {
+ each(boost::bind(&ConfigurationObserver::exchangeCreate, _1, e));
+ }
+ void exchangeDestroy(const boost::shared_ptr<Exchange>& e) {
+ each(boost::bind(&ConfigurationObserver::exchangeDestroy, _1, e));
+ }
+ void bind(const boost::shared_ptr<Exchange>& exchange,
+ const boost::shared_ptr<Queue>& queue,
+ const std::string& key,
+ const framing::FieldTable& args) {
+ each(boost::bind(
+ &ConfigurationObserver::bind, _1, exchange, queue, key, args));
+ }
+ void unbind(const boost::shared_ptr<Exchange>& exchange,
+ const boost::shared_ptr<Queue>& queue,
+ const std::string& key,
+ const framing::FieldTable& args) {
+ each(boost::bind(
+ &ConfigurationObserver::unbind, _1, exchange, queue, key, args));
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONFIGURATIONOBSERVERS_H*/
diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
index 07e515f3c9..e9014c80c3 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h
@@ -23,9 +23,7 @@
*/
#include "ConnectionObserver.h"
-#include "qpid/sys/Mutex.h"
-#include <set>
-#include <algorithm>
+#include "Observers.h"
namespace qpid {
namespace broker {
@@ -35,18 +33,10 @@ namespace broker {
* Calling a ConnectionObserver function will call that function on each observer.
* THREAD SAFE.
*/
-class ConnectionObservers : public ConnectionObserver {
+class ConnectionObservers : public ConnectionObserver,
+ public Observers<ConnectionObserver>
+{
public:
- void add(boost::shared_ptr<ConnectionObserver> observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.insert(observer);
- }
-
- void remove(boost::shared_ptr<ConnectionObserver> observer) {
- sys::Mutex::ScopedLock l(lock);
- observers.erase(observer);
- }
-
void connection(Connection& c) {
each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
}
@@ -62,16 +52,6 @@ class ConnectionObservers : public ConnectionObserver {
void forced(Connection& c, const std::string& text) {
each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text));
}
-
- private:
- typedef std::set<boost::shared_ptr<ConnectionObserver> > Observers;
- sys::Mutex lock;
- Observers observers;
-
- template <class F> void each(F f) {
- sys::Mutex::ScopedLock l(lock);
- std::for_each(observers.begin(), observers.end(), f);
- }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 43d7268dfb..dde59d41c1 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "qpid/broker/Broker.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
@@ -69,6 +70,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
exchange = i->second(name, durable, args, parent, broker);
}
}
+ if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
exchanges[name] = exchange;
return std::pair<Exchange::shared_ptr, bool>(exchange, true);
} else {
@@ -85,8 +87,10 @@ void ExchangeRegistry::destroy(const string& name){
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
+ Exchange::shared_ptr ex = i->second;
i->second->destroy();
exchanges.erase(i);
+ if (broker) broker->getConfigurationObservers().exchangeDestroy(ex);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h
new file mode 100644
index 0000000000..c62f75d6d0
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Observers.h
@@ -0,0 +1,69 @@
+#ifndef QPID_BROKER_OBSERVERS_H
+#define QPID_BROKER_OBSERVERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+#include <algorithm>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for collections of observers with thread-safe add/remove and traversal.
+ */
+template <class Observer>
+class Observers
+{
+ public:
+ void add(boost::shared_ptr<Observer> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.push_back(observer);
+ }
+
+ void remove(boost::shared_ptr<Observer> observer) {
+ sys::Mutex::ScopedLock l(lock);
+ typename List::iterator i = std::find(observers.begin(), observers.end(), observer);
+ observers.erase(i);
+ }
+
+ protected:
+ typedef std::vector<boost::shared_ptr<Observer> > List;
+
+ sys::Mutex lock;
+ List observers;
+
+ template <class F> void each(F f) {
+ List copy;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ copy = observers;
+ }
+ std::for_each(copy.begin(), copy.end(), f);
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_OBSERVERS_H*/
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 236d5ae34c..6647774168 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueEvents.h"
@@ -64,17 +65,22 @@ QueueRegistry::declare(const string& declareName, bool durable,
//i.e. recovering a queue for which we already have a persistent record
queue->configure(arguments);
}
+ if (broker) broker->getConfigurationObservers().queueCreate(queue);
queues[name] = queue;
if (lastNode) queue->setLastNodeFailure();
-
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
return std::pair<Queue::shared_ptr, bool>(i->second, false);
}
}
-void QueueRegistry::destroyLH (const string& name){
- queues.erase(name);
+void QueueRegistry::destroyLH (const string& name) {
+ QueueMap::iterator i = queues.find(name);
+ if (i != queues.end()) {
+ Queue::shared_ptr q = i->second;
+ queues.erase(i);
+ if (broker) broker->getConfigurationObservers().queueDestroy(q);
+ }
}
void QueueRegistry::destroy (const string& name){
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 021e4d559e..6095837cd6 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -94,10 +94,11 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
statusChanged(l);
- QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo);
+ QPID_LOG(notice, logPrefix << "Broker starting: " << brokerInfo);
}
HaBroker::~HaBroker() {
+ QPID_LOG(debug, logPrefix << "Broker shut down: " << brokerInfo);
broker.getConnectionObservers().remove(excluder);
}