diff options
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 2 |
6 files changed, 28 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index fe2e9db3d5..f760bc9747 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -70,7 +70,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - preRoute(msg); + PreRoute pr(msg, this); Queues::ConstPtr p; { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index deb8df3fc4..a3130d9edb 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -41,6 +41,20 @@ const std::string qpidMsgSequence("qpid.msg_sequence"); } +Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { + if (parent && parent->sequence){ + parent->sequenceLock.lock(); + parent->sequenceNo++; + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + } +} + +Exchange::PreRoute::~PreRoute(){ + if (parent && parent->sequence){ + parent->sequenceLock.unlock(); + } +} + Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), sequenceNo(0), mgmtExchange(0) @@ -89,13 +103,6 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } -void Exchange::preRoute(Deliverable& msg){ - if (sequence){ - sys::Mutex::ScopedLock lock(sequenceLock); - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo); - } -} - void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 54d16a47bd..6006a09ea5 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -46,14 +46,20 @@ namespace qpid { boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; + + protected: bool sequence; mutable qpid::sys::Mutex sequenceLock; uint64_t sequenceNo; - - protected: - - void preRoute(Deliverable& msg); + class PreRoute{ + public: + PreRoute(Deliverable& msg, Exchange* _p); + ~PreRoute(); + private: + Exchange* parent; + }; + struct Binding : public management::Manageable { typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 42fe537c6a..e92fac41dc 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -69,7 +69,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ - preRoute(msg); + PreRoute pr(msg, this); uint32_t count(0); BindingsArray::ConstPtr p = bindings.snapshot(); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 8e62803cc6..b2bd4519bd 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -105,7 +105,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ if (!args) return;//can't match if there were no headers passed in - preRoute(msg); + PreRoute pr(msg, this); uint32_t count(0); diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index cf4a765266..d9be8b0d68 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -180,7 +180,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); - preRoute(msg); + PreRoute pr(msg, this); uint32_t count(0); Tokens tokens(routingKey); |
