diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:17 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:17 +0000 |
| commit | b8067d2ecef01588f1fe73c8159cafacd8e1e217 (patch) | |
| tree | 0f931cabf8a95257c64e541ad9dcbc6099f73f2b /qpid/cpp/src | |
| parent | 2c26294c60daa02e19189cbbd935e2441f2c541c (diff) | |
| download | qpid-python-b8067d2ecef01588f1fe73c8159cafacd8e1e217.tar.gz | |
QPID-3603: Move calls to ConfigurationObserver outside of locks.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349541 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp | 80 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 47 |
2 files changed, 69 insertions, 58 deletions
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index dde59d41c1..b31c7bd7b8 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -43,39 +43,42 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, bool durable, const FieldTable& args){ - RWlock::ScopedWlock locker(lock); - ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) { - Exchange::shared_ptr exchange; - - if (type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); - }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); - }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); - }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); - }else if (type == ManagementDirectExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); - }else if (type == ManagementTopicExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); - }else if (type == Link::exchangeTypeName) { - exchange = Link::linkExchangeFactory(name); - }else{ - FunctionMap::iterator i = factory.find(type); - if (i == factory.end()) { - throw UnknownExchangeTypeException(); - } else { - exchange = i->second(name, durable, args, parent, broker); + Exchange::shared_ptr exchange; + std::pair<Exchange::shared_ptr, bool> result; + { + RWlock::ScopedWlock locker(lock); + ExchangeMap::iterator i = exchanges.find(name); + if (i == exchanges.end()) { + if (type == TopicExchange::typeName){ + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); + }else if(type == DirectExchange::typeName){ + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); + }else if(type == FanOutExchange::typeName){ + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); + }else if (type == HeadersExchange::typeName) { + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); + }else if (type == ManagementDirectExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); + }else if (type == ManagementTopicExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); + }else if (type == Link::exchangeTypeName) { + exchange = Link::linkExchangeFactory(name); + }else{ + FunctionMap::iterator i = factory.find(type); + if (i == factory.end()) { + throw UnknownExchangeTypeException(); + } else { + exchange = i->second(name, durable, args, parent, broker); + } } + exchanges[name] = exchange; + result = std::pair<Exchange::shared_ptr, bool>(exchange, true); + } else { + result = std::pair<Exchange::shared_ptr, bool>(i->second, false); } - if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); - exchanges[name] = exchange; - return std::pair<Exchange::shared_ptr, bool>(exchange, true); - } else { - return std::pair<Exchange::shared_ptr, bool>(i->second, false); } + if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange); + return result; } void ExchangeRegistry::destroy(const string& name){ @@ -84,14 +87,17 @@ void ExchangeRegistry::destroy(const string& name){ (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) || name == "qpid.management") throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'")); - RWlock::ScopedWlock locker(lock); - ExchangeMap::iterator i = exchanges.find(name); - if (i != exchanges.end()) { - Exchange::shared_ptr ex = i->second; - i->second->destroy(); - exchanges.erase(i); - if (broker) broker->getConfigurationObservers().exchangeDestroy(ex); + Exchange::shared_ptr exchange; + { + RWlock::ScopedWlock locker(lock); + ExchangeMap::iterator i = exchanges.find(name); + if (i != exchanges.end()) { + exchange = i->second; + i->second->destroy(); + exchanges.erase(i); + } } + if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange); } Exchange::shared_ptr ExchangeRegistry::find(const string& name){ diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 6647774168..2916d7bb93 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -47,31 +47,36 @@ QueueRegistry::declare(const string& declareName, bool durable, definition from persistente record*/) { - RWlock::ScopedWlock locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); - QueueMap::iterator i = queues.find(name); + Queue::shared_ptr queue; + std::pair<Queue::shared_ptr, bool> result; + { + RWlock::ScopedWlock locker(lock); + string name = declareName.empty() ? generateName() : declareName; + assert(!name.empty()); + QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); - if (alternate) { - queue->setAlternateExchange(alternate);//need to do this *before* create - alternate->incAlternateUsers(); - } - if (!recovering) { - //apply settings & create persistent record if required - queue->create(arguments); + if (i == queues.end()) { + queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); + if (alternate) { + queue->setAlternateExchange(alternate);//need to do this *before* create + alternate->incAlternateUsers(); + } + if (!recovering) { + //apply settings & create persistent record if required + queue->create(arguments); + } else { + //i.e. recovering a queue for which we already have a persistent record + queue->configure(arguments); + } + queues[name] = queue; + if (lastNode) queue->setLastNodeFailure(); + result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { - //i.e. recovering a queue for which we already have a persistent record - queue->configure(arguments); + result = std::pair<Queue::shared_ptr, bool>(i->second, false); } - if (broker) broker->getConfigurationObservers().queueCreate(queue); - queues[name] = queue; - if (lastNode) queue->setLastNodeFailure(); - return std::pair<Queue::shared_ptr, bool>(queue, true); - } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); } + if (broker && queue) broker->getConfigurationObservers().queueCreate(queue); + return result; } void QueueRegistry::destroyLH (const string& name) { |
