summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp32
1 files changed, 25 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index a3130d9edb..3cea904676 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -24,6 +24,7 @@
#include "qpid/agent/ManagementAgent.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
+#include "DeliverableMessage.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -38,26 +39,41 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidIVE("qpid.ive");
}
Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
- if (parent && parent->sequence){
- parent->sequenceLock.lock();
- parent->sequenceNo++;
- msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ if (parent){
+ if (parent->sequence || parent->ive) parent->sequenceLock.lock();
+
+ if (parent->sequence){
+ parent->sequenceNo++;
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
+ if (parent->ive) {
+ parent->lastMsg = &( msg.getMessage());
+ }
}
}
Exchange::PreRoute::~PreRoute(){
- if (parent && parent->sequence){
+ if (parent && (parent->sequence || parent->ive)){
parent->sequenceLock.unlock();
}
}
+void Exchange::routeIVE(){
+ if (ive && lastMsg.get()){
+ DeliverableMessage dmsg(lastMsg);
+ route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders());
+ }
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), mgmtExchange(0)
+ sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -73,7 +89,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), mgmtExchange(0)
+ sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -95,6 +111,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
sequence = _args.get(qpidMsgSequence);
if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ ive = _args.get(qpidIVE);
+ if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
}
Exchange::~Exchange ()