diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-13 18:07:07 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-13 18:07:07 +0000 |
| commit | 604b624676c20770d6b6b30be3b2fb357892982e (patch) | |
| tree | d9cf003df5760f7815f94e9181df5429b1bbeaf4 /cpp/src/qpid/broker/Exchange.cpp | |
| parent | 02b136ad60558724e77cb1d72d9ca06679c7df87 (diff) | |
| download | qpid-python-604b624676c20770d6b6b30be3b2fb357892982e.tar.gz | |
QPID-1351
-Support for sequencing messages through an exchange
-Related changes
- Bug fix for ptr saftey in Headers & FanOut exchange
- Added support for int64 and uint64 in fieldvalue / fieldtable
- Added tests for fieldtable
- Added tests for sequencing message feature.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 4824fa5742..deb8df3fc4 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -22,8 +22,11 @@ #include "Exchange.h" #include "ExchangeRegistry.h" #include "qpid/agent/ManagementAgent.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/MessageProperties.h" using namespace qpid::broker; +using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::management::ManagementAgent; @@ -32,8 +35,15 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidMsgSequence("qpid.msg_sequence"); +} + + Exchange::Exchange (const string& _name, Manageable* parent) : - name(_name), durable(false), persistenceId(0), mgmtExchange(0) + name(_name), durable(false), persistenceId(0), sequence(false), + sequenceNo(0), mgmtExchange(0) { if (parent != 0) { @@ -48,7 +58,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0) + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), + sequence(false), sequenceNo(0), mgmtExchange(0) { if (parent != 0) { @@ -66,6 +77,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } } } + + sequence = _args.get(qpidMsgSequence); + if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + } Exchange::~Exchange () @@ -74,6 +89,13 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } +void Exchange::preRoute(Deliverable& msg){ + if (sequence){ + sys::Mutex::ScopedLock lock(sequenceLock); + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo); + } +} + void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) |
