diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-17 01:27:45 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-17 01:27:45 +0000 |
| commit | 5644e4fbfd777921b33874aed13c45d544c8a383 (patch) | |
| tree | 7f9f1527a43f46ff92a43088ac13d0faf6224719 /cpp/src/qpid/broker/Exchange.cpp | |
| parent | bf54dc92bf7d46862cbef3113314b7b16797d92e (diff) | |
| download | qpid-python-5644e4fbfd777921b33874aed13c45d544c8a383.tar.gz | |
Feature requested by AndrewM for M4...
- provide initial value support, for late joining consumers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705443 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index a3130d9edb..3cea904676 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -24,6 +24,7 @@ #include "qpid/agent/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" +#include "DeliverableMessage.h" using namespace qpid::broker; using namespace qpid::framing; @@ -38,26 +39,41 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); +const std::string qpidIVE("qpid.ive"); } Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { - if (parent && parent->sequence){ - parent->sequenceLock.lock(); - parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + if (parent){ + if (parent->sequence || parent->ive) parent->sequenceLock.lock(); + + if (parent->sequence){ + parent->sequenceNo++; + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + } + if (parent->ive) { + parent->lastMsg = &( msg.getMessage()); + } } } Exchange::PreRoute::~PreRoute(){ - if (parent && parent->sequence){ + if (parent && (parent->sequence || parent->ive)){ parent->sequenceLock.unlock(); } } +void Exchange::routeIVE(){ + if (ive && lastMsg.get()){ + DeliverableMessage dmsg(lastMsg); + route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders()); + } +} + + Exchange::Exchange (const string& _name, Manageable* parent) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), mgmtExchange(0) + sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -73,7 +89,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), - sequence(false), sequenceNo(0), mgmtExchange(0) + sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) { if (parent != 0) { @@ -95,6 +111,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel sequence = _args.get(qpidMsgSequence); if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + ive = _args.get(qpidIVE); + if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); } Exchange::~Exchange () |
