summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:20:17 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:20:17 +0000
commitb8067d2ecef01588f1fe73c8159cafacd8e1e217 (patch)
tree0f931cabf8a95257c64e541ad9dcbc6099f73f2b /qpid/cpp/src
parent2c26294c60daa02e19189cbbd935e2441f2c541c (diff)
downloadqpid-python-b8067d2ecef01588f1fe73c8159cafacd8e1e217.tar.gz
QPID-3603: Move calls to ConfigurationObserver outside of locks.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349541 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp80
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp47
2 files changed, 69 insertions, 58 deletions
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index dde59d41c1..b31c7bd7b8 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -43,39 +43,42 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
bool durable, const FieldTable& args){
- RWlock::ScopedWlock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- Exchange::shared_ptr exchange;
-
- if (type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
- }else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
- }else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
- }else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
- }else if (type == ManagementDirectExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
- }else if (type == ManagementTopicExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
- }else if (type == Link::exchangeTypeName) {
- exchange = Link::linkExchangeFactory(name);
- }else{
- FunctionMap::iterator i = factory.find(type);
- if (i == factory.end()) {
- throw UnknownExchangeTypeException();
- } else {
- exchange = i->second(name, durable, args, parent, broker);
+ Exchange::shared_ptr exchange;
+ std::pair<Exchange::shared_ptr, bool> result;
+ {
+ RWlock::ScopedWlock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ if (type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementDirectExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementTopicExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
+ }else if (type == Link::exchangeTypeName) {
+ exchange = Link::linkExchangeFactory(name);
+ }else{
+ FunctionMap::iterator i = factory.find(type);
+ if (i == factory.end()) {
+ throw UnknownExchangeTypeException();
+ } else {
+ exchange = i->second(name, durable, args, parent, broker);
+ }
}
+ exchanges[name] = exchange;
+ result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
- if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
- exchanges[name] = exchange;
- return std::pair<Exchange::shared_ptr, bool>(exchange, true);
- } else {
- return std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
+ if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
+ return result;
}
void ExchangeRegistry::destroy(const string& name){
@@ -84,14 +87,17 @@ void ExchangeRegistry::destroy(const string& name){
(name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
name == "qpid.management")
throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
- RWlock::ScopedWlock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i != exchanges.end()) {
- Exchange::shared_ptr ex = i->second;
- i->second->destroy();
- exchanges.erase(i);
- if (broker) broker->getConfigurationObservers().exchangeDestroy(ex);
+ Exchange::shared_ptr exchange;
+ {
+ RWlock::ScopedWlock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i != exchanges.end()) {
+ exchange = i->second;
+ i->second->destroy();
+ exchanges.erase(i);
+ }
}
+ if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
}
Exchange::shared_ptr ExchangeRegistry::find(const string& name){
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 6647774168..2916d7bb93 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -47,31 +47,36 @@ QueueRegistry::declare(const string& declareName, bool durable,
definition from persistente
record*/)
{
- RWlock::ScopedWlock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
- QueueMap::iterator i = queues.find(name);
+ Queue::shared_ptr queue;
+ std::pair<Queue::shared_ptr, bool> result;
+ {
+ RWlock::ScopedWlock locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
- if (alternate) {
- queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
- if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
+ if (i == queues.end()) {
+ queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+ if (alternate) {
+ queue->setAlternateExchange(alternate);//need to do this *before* create
+ alternate->incAlternateUsers();
+ }
+ if (!recovering) {
+ //apply settings & create persistent record if required
+ queue->create(arguments);
+ } else {
+ //i.e. recovering a queue for which we already have a persistent record
+ queue->configure(arguments);
+ }
+ queues[name] = queue;
+ if (lastNode) queue->setLastNodeFailure();
+ result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
+ result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
- if (broker) broker->getConfigurationObservers().queueCreate(queue);
- queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
- return std::pair<Queue::shared_ptr, bool>(queue, true);
- } else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
}
+ if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+ return result;
}
void QueueRegistry::destroyLH (const string& name) {