diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/DirectExchange.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.h | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/FanOutExchange.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStore.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 1 |
16 files changed, 43 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 03036fb825..d834399180 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -196,7 +196,7 @@ void Broker::declareStandardExchange(const std::string& name, const std::string& bool storeEnabled = store != NULL; std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); if (status.second && storeEnabled) { - store->create(*status.first); + store->create(*status.first, framing::FieldTable ()); } } diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index d7fd20db1d..ea964ef3a3 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -65,7 +65,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); if (response.second) { if (durable) { - getBroker().getStore().create(*response.first); + getBroker().getStore().create(*response.first, args); } if (alternate) { response.first->setAlternate(alternate); diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 43b707a5c8..72021b8d98 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -54,7 +54,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingKey].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); + mgmtExchange->inc_bindings(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); } return true; } else{ @@ -78,6 +79,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } if (mgmtExchange.get() != 0) { mgmtExchange->dec_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); } return true; } else { diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 83466085bc..47d616cf16 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -103,8 +103,9 @@ ManagementObject::shared_ptr Exchange::GetManagementObject (void) const return dynamic_pointer_cast<ManagementObject> (mgmtExchange); } -Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent) - : queue(_queue), key(_key) +Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent, + FieldTable _args) + : queue(_queue), key(_key), args(_args) { if (parent != 0) { @@ -116,7 +117,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { uint64_t queueId = mo->getObjectId(); mgmtBinding = management::Binding::shared_ptr - (new management::Binding (this, (Manageable*) parent, queueId, key)); + (new management::Binding (this, (Manageable*) parent, queueId, key, args)); agent->addObject (mgmtBinding); } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index e9f5b3965a..7902eb4219 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -51,12 +51,13 @@ namespace qpid { typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; - Queue::shared_ptr queue; - const std::string key; - const qpid::framing::FieldTable args; + Queue::shared_ptr queue; + const std::string key; + const framing::FieldTable args; management::Binding::shared_ptr mgmtBinding; - Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0); + Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0, + framing::FieldTable args = framing::FieldTable ()); ~Binding (); management::ManagementObject::shared_ptr GetManagementObject () const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 714d4ea444..df723d2c8f 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -54,6 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, bindings.push_back(binding); if (mgmtExchange.get() != 0) { mgmtExchange->inc_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); } return true; } else { @@ -73,6 +74,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* bindings.erase(i); if (mgmtExchange.get() != 0) { mgmtExchange->dec_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); } return true; } else { diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 886ad4767b..5196099ed5 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -85,12 +85,13 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co break; if (i == bindings.end()) { - Binding::shared_ptr binding (new Binding (bindingKey, queue, this)); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); HeaderMap headerMap(*args, binding); bindings.push_back(headerMap); if (mgmtExchange.get() != 0) { mgmtExchange->inc_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); } return true; } else { @@ -115,6 +116,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, bindings.erase(i); if (mgmtExchange.get() != 0) { mgmtExchange->dec_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); } return true; } else { diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index 72633b04a5..76469ccc50 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -26,6 +26,7 @@ #include "PersistableQueue.h" #include "RecoveryManager.h" #include "TransactionalStore.h" +#include "qpid/framing/FieldTable.h" #include <qpid/Options.h> @@ -56,7 +57,8 @@ public: /** * Record the existence of a durable queue */ - virtual void create(PersistableQueue& queue) = 0; + virtual void create(PersistableQueue& queue, + const framing::FieldTable& args) = 0; /** * Destroy a durable queue */ @@ -65,7 +67,8 @@ public: /** * Record the existence of a durable exchange */ - virtual void create(const PersistableExchange& exchange) = 0; + virtual void create(const PersistableExchange& exchange, + const framing::FieldTable& args) = 0; /** * Destroy a durable exchange */ diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 5d684ce6d7..e02c87f069 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -27,6 +27,7 @@ using boost::intrusive_ptr; using namespace qpid::broker; +using qpid::framing::FieldTable; MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {} @@ -37,9 +38,9 @@ MessageStoreModule::~MessageStoreModule() bool MessageStoreModule::init(const Options*) { return true; } -void MessageStoreModule::create(PersistableQueue& queue) +void MessageStoreModule::create(PersistableQueue& queue, const FieldTable& args) { - TRANSFER_EXCEPTION(store->create(queue)); + TRANSFER_EXCEPTION(store->create(queue, args)); } void MessageStoreModule::destroy(PersistableQueue& queue) @@ -47,9 +48,9 @@ void MessageStoreModule::destroy(PersistableQueue& queue) TRANSFER_EXCEPTION(store->destroy(queue)); } -void MessageStoreModule::create(const PersistableExchange& exchange) +void MessageStoreModule::create(const PersistableExchange& exchange, const FieldTable& args) { - TRANSFER_EXCEPTION(store->create(exchange)); + TRANSFER_EXCEPTION(store->create(exchange, args)); } void MessageStoreModule::destroy(const PersistableExchange& exchange) diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index abc0fbfd7a..c7ad76d8bb 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -49,9 +49,9 @@ public: void abort(TransactionContext& txn); void collectPreparedXids(std::set<std::string>& xids); - void create(PersistableQueue& queue); + void create(PersistableQueue& queue, const framing::FieldTable& args); void destroy(PersistableQueue& queue); - void create(const PersistableExchange& exchange); + void create(const PersistableExchange& exchange, const framing::FieldTable& args); void destroy(const PersistableExchange& exchange); void bind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index 0dc7dbb82d..8936b0440f 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -53,7 +53,7 @@ NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} bool NullMessageStore::init(const Options* /*options*/) {return true;} -void NullMessageStore::create(PersistableQueue& queue) +void NullMessageStore::create(PersistableQueue& queue, const framing::FieldTable& /*args*/) { QPID_LOG(info, "Queue '" << queue.getName() << "' will not be durable. Persistence not enabled."); @@ -63,7 +63,7 @@ void NullMessageStore::destroy(PersistableQueue&) { } -void NullMessageStore::create(const PersistableExchange& exchange) +void NullMessageStore::create(const PersistableExchange& exchange, const framing::FieldTable& /*args*/) { QPID_LOG(info, "Exchange'" << exchange.getName() << "' will not be durable. Persistence not enabled."); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index 7cde4db70b..96d1c483a2 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -48,9 +48,9 @@ public: virtual void abort(TransactionContext& txn); virtual void collectPreparedXids(std::set<std::string>& xids); - virtual void create(PersistableQueue& queue); + virtual void create(PersistableQueue& queue, const framing::FieldTable& args); virtual void destroy(PersistableQueue& queue); - virtual void create(const PersistableExchange& exchange); + virtual void create(const PersistableExchange& exchange, const framing::FieldTable& args); virtual void destroy(const PersistableExchange& exchange); virtual void bind(const PersistableExchange& exchange, const PersistableQueue& queue, diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 7ec72951df..591e9796d6 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -474,7 +474,7 @@ void Queue::create(const FieldTable& _settings) { settings = _settings; if (store) { - store->create(*this); + store->create(*this, _settings); } configure(_settings); } @@ -484,11 +484,13 @@ void Queue::configure(const FieldTable& _settings) std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings)); if (_policy->getMaxCount() || _policy->getMaxSize()) { setPolicy(_policy); - } + } if (owner) { noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); } + if (mgmtObject.get() != 0) + mgmtObject->set_arguments (_settings); } void Queue::destroy() diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 497f381807..d30a1dc696 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -66,7 +66,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); if (response.second) { if (durable) { - getBroker().getStore().create(*response.first); + getBroker().getStore().create(*response.first, args); } if (alternate) { response.first->setAlternate(alternate); diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 5330ee4fd0..1c4fa2ea7a 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -139,6 +139,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons bindings[routingPattern].push_back(binding); if (mgmtExchange.get() != 0) { mgmtExchange->inc_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); } return true; } @@ -159,6 +160,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co if(qv.empty()) bindings.erase(bi); if (mgmtExchange.get() != 0) { mgmtExchange->dec_bindings (); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); } return true; } diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index ca8a0fd558..48a3372d16 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -60,6 +60,7 @@ class ManagementObject static const uint8_t TYPE_FLOAT = 12; static const uint8_t TYPE_DOUBLE = 13; static const uint8_t TYPE_UUID = 14; + static const uint8_t TYPE_FTABLE = 15; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; |
