diff options
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index a3130d9edb..3cea904676 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -24,6 +24,7 @@ #include "qpid/agent/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" +#include "DeliverableMessage.h" using namespace qpid::broker; using namespace qpid::framing; @@ -38,26 +39,41 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); +const std::string qpidIVE("qpid.ive"); } Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { - if (parent && parent->sequence){ - parent->sequenceLock.lock(); - parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + if (parent){ + if (parent->sequence || parent->ive) parent->sequenceLock.lock(); + + if (parent->sequence){ + parent->sequenceNo++; + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + } + if (parent->ive) { + parent->lastMsg = &( msg.getMessage()); + } } } Exchange::PreRoute::~PreRoute(){ - if (parent && parent->sequence){ + if (parent && (parent->sequence || parent->ive)){ parent->sequenceLock.unlock(); } } +void Exchange::routeIVE(){ + if (ive && lastMsg.get()){ + DeliverableMessage dmsg(lastMsg); + route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + } +} + + Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), mgmtExchange(0) + sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -73,7 +89,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), mgmtExchange(0) + sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -95,6 +111,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel sequence = _args.get(qpidMsgSequence); if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + ive = _args.get(qpidIVE); + if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); } Exchange::~Exchange () |
