summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-10-13 18:07:07 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-10-13 18:07:07 +0000
commit604b624676c20770d6b6b30be3b2fb357892982e (patch)
treed9cf003df5760f7815f94e9181df5429b1bbeaf4 /cpp/src/qpid/broker/Exchange.cpp
parent02b136ad60558724e77cb1d72d9ca06679c7df87 (diff)
downloadqpid-python-604b624676c20770d6b6b30be3b2fb357892982e.tar.gz
QPID-1351
-Support for sequencing messages through an exchange -Related changes - Bug fix for ptr saftey in Headers & FanOut exchange - Added support for int64 and uint64 in fieldvalue / fieldtable - Added tests for fieldtable - Added tests for sequencing message feature. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp26
1 files changed, 24 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 4824fa5742..deb8df3fc4 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -22,8 +22,11 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
#include "qpid/agent/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/MessageProperties.h"
using namespace qpid::broker;
+using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::management::ManagementAgent;
@@ -32,8 +35,15 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidMsgSequence("qpid.msg_sequence");
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
- name(_name), durable(false), persistenceId(0), mgmtExchange(0)
+ name(_name), durable(false), persistenceId(0), sequence(false),
+ sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -48,7 +58,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
+ sequence(false), sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -66,6 +77,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
}
}
+
+ sequence = _args.get(qpidMsgSequence);
+ if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+
}
Exchange::~Exchange ()
@@ -74,6 +89,13 @@ Exchange::~Exchange ()
mgmtExchange->resourceDestroy ();
}
+void Exchange::preRoute(Deliverable& msg){
+ if (sequence){
+ sys::Mutex::ScopedLock lock(sequenceLock);
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
+ }
+}
+
void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)