From 5644e4fbfd777921b33874aed13c45d544c8a383 Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Fri, 17 Oct 2008 01:27:45 +0000 Subject: Feature requested by AndrewM for M4... - provide initial value support, for late joining consumers git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705443 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Exchange.cpp | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) (limited to 'cpp/src/qpid/broker/Exchange.cpp') diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index a3130d9edb..3cea904676 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -24,6 +24,7 @@ #include "qpid/agent/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" +#include "DeliverableMessage.h" using namespace qpid::broker; using namespace qpid::framing; @@ -38,26 +39,41 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); +const std::string qpidIVE("qpid.ive"); } Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { - if (parent && parent->sequence){ - parent->sequenceLock.lock(); - parent->sequenceNo++; - msg.getMessage().getProperties()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + if (parent){ + if (parent->sequence || parent->ive) parent->sequenceLock.lock(); + + if (parent->sequence){ + parent->sequenceNo++; + msg.getMessage().getProperties()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + } + if (parent->ive) { + parent->lastMsg = &( msg.getMessage()); + } } } Exchange::PreRoute::~PreRoute(){ - if (parent && parent->sequence){ + if (parent && (parent->sequence || parent->ive)){ parent->sequenceLock.unlock(); } } +void Exchange::routeIVE(){ + if (ive && lastMsg.get()){ + DeliverableMessage dmsg(lastMsg); + route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + } +} + + Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), mgmtExchange(0) + sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -73,7 +89,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), mgmtExchange(0) + sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -95,6 +111,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel sequence = _args.get(qpidMsgSequence); if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + ive = _args.get(qpidIVE); + if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); } Exchange::~Exchange () -- cgit v1.2.1