diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-13 18:07:07 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-13 18:07:07 +0000 |
| commit | 604b624676c20770d6b6b30be3b2fb357892982e (patch) | |
| tree | d9cf003df5760f7815f94e9181df5429b1bbeaf4 /cpp/src/qpid/broker | |
| parent | 02b136ad60558724e77cb1d72d9ca06679c7df87 (diff) | |
| download | qpid-python-604b624676c20770d6b6b30be3b2fb357892982e.tar.gz | |
QPID-1351
-Support for sequencing messages through an exchange
-Related changes
- Bug fix for ptr saftey in Headers & FanOut exchange
- Added support for int64 and uint64 in fieldvalue / fieldtable
- Added tests for fieldtable
- Added tests for sequencing message feature.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 1 |
6 files changed, 53 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index b4b892feeb..fe2e9db3d5 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -70,6 +70,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + preRoute(msg); Queues::ConstPtr p; { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 4824fa5742..deb8df3fc4 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -22,8 +22,11 @@ #include "Exchange.h" #include "ExchangeRegistry.h" #include "qpid/agent/ManagementAgent.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/MessageProperties.h" using namespace qpid::broker; +using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::management::ManagementAgent; @@ -32,8 +35,15 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidMsgSequence("qpid.msg_sequence"); +} + + Exchange::Exchange (const string& _name, Manageable* parent) : - name(_name), durable(false), persistenceId(0), mgmtExchange(0) + name(_name), durable(false), persistenceId(0), sequence(false), + sequenceNo(0), mgmtExchange(0) { if (parent != 0) { @@ -48,7 +58,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0) + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), + sequence(false), sequenceNo(0), mgmtExchange(0) { if (parent != 0) { @@ -66,6 +77,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } } } + + sequence = _args.get(qpidMsgSequence); + if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + } Exchange::~Exchange () @@ -74,6 +89,13 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } +void Exchange::preRoute(Deliverable& msg){ + if (sequence){ + sys::Mutex::ScopedLock lock(sequenceLock); + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo); + } +} + void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 9901fbb18b..54d16a47bd 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -28,6 +28,7 @@ #include "MessageStore.h" #include "PersistableExchange.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Exchange.h" #include "qmf/org/apache/qpid/broker/Binding.h" @@ -45,8 +46,14 @@ namespace qpid { boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; + bool sequence; + mutable qpid::sys::Mutex sequenceLock; + uint64_t sequenceNo; protected: + + void preRoute(Deliverable& msg); + struct Binding : public management::Manageable { typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 1e03d34b25..42fe537c6a 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -69,15 +69,18 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ + preRoute(msg); uint32_t count(0); BindingsArray::ConstPtr p = bindings.snapshot(); - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + if (p.get()){ + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } } - + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 5f89202c75..8e62803cc6 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -105,14 +105,17 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ if (!args) return;//can't match if there were no headers passed in + preRoute(msg); uint32_t count(0); Bindings::ConstPtr p = bindings.snapshot(); - for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { - if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + if (p.get()){ + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { + if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } } if (mgmtExchange != 0) @@ -136,9 +139,11 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { Bindings::ConstPtr p = bindings.snapshot(); - for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { - if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { - return true; + if (p.get()){ + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { + return true; + } } } return false; diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 787cf9637c..cf4a765266 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -180,6 +180,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); + preRoute(msg); uint32_t count(0); Tokens tokens(routingKey); |
