summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp21
-rw-r--r--cpp/src/qpid/broker/Exchange.h14
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp2
6 files changed, 28 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index fe2e9db3d5..f760bc9747 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -70,7 +70,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- preRoute(msg);
+ PreRoute pr(msg, this);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index deb8df3fc4..a3130d9edb 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -41,6 +41,20 @@ const std::string qpidMsgSequence("qpid.msg_sequence");
}
+Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
+ if (parent && parent->sequence){
+ parent->sequenceLock.lock();
+ parent->sequenceNo++;
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
+}
+
+Exchange::PreRoute::~PreRoute(){
+ if (parent && parent->sequence){
+ parent->sequenceLock.unlock();
+ }
+}
+
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
sequenceNo(0), mgmtExchange(0)
@@ -89,13 +103,6 @@ Exchange::~Exchange ()
mgmtExchange->resourceDestroy ();
}
-void Exchange::preRoute(Deliverable& msg){
- if (sequence){
- sys::Mutex::ScopedLock lock(sequenceLock);
- msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
- }
-}
-
void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 54d16a47bd..6006a09ea5 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -46,14 +46,20 @@ namespace qpid {
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+
+ protected:
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
uint64_t sequenceNo;
-
- protected:
-
- void preRoute(Deliverable& msg);
+ class PreRoute{
+ public:
+ PreRoute(Deliverable& msg, Exchange* _p);
+ ~PreRoute();
+ private:
+ Exchange* parent;
+ };
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 42fe537c6a..e92fac41dc 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -69,7 +69,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
- preRoute(msg);
+ PreRoute pr(msg, this);
uint32_t count(0);
BindingsArray::ConstPtr p = bindings.snapshot();
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 8e62803cc6..b2bd4519bd 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -105,7 +105,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
- preRoute(msg);
+ PreRoute pr(msg, this);
uint32_t count(0);
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index cf4a765266..d9be8b0d68 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -180,7 +180,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- preRoute(msg);
+ PreRoute pr(msg, this);
uint32_t count(0);
Tokens tokens(routingKey);