diff options
| author | Gordon Sim <gsim@apache.org> | 2009-03-09 15:58:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-03-09 15:58:17 +0000 |
| commit | 1214783620f81f2b0b1e69c4c4df874d58cdf85b (patch) | |
| tree | 354f0767a8b113278cb3d5f8208ee8769fe56bac /cpp/src/qpid/broker/Queue.cpp | |
| parent | 8f0e57d62c16d4723e6202127490ec12473e24d0 (diff) | |
| download | qpid-python-1214783620f81f2b0b1e69c4c4df874d58cdf85b.tar.gz | |
QPID-1721: Fixes for replication between clusters when new members are added
* suppress event generation during node catch up
* ensure sequence counters used for duplicate detection are synchronised in both primary and dr clusters when new members join
* connect queue with the event manager within queue registry rather than adapter as the latter path is not used for catchup
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751719 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index bc29815e84..a1a83926bf 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -93,7 +93,8 @@ Queue::Queue(const string& _name, bool _autodelete, policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0) + eventMgr(0), + insertSeqNo(0) { if (parent != 0) { @@ -551,6 +552,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) policy->tryEnqueue(qm); + if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); @@ -578,8 +580,9 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ messages.push_back(qm); listeners.populate(copy); } - if (eventMode && eventMgr) { - eventMgr->enqueued(qm); + if (eventMode) { + if (eventMgr) eventMgr->enqueued(qm); + else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } } copy.notify(); @@ -989,3 +992,9 @@ void Queue::recoveryComplete() for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); pendingDequeues.clear(); } + +void Queue::insertSequenceNumbers(const std::string& key) +{ + seqNoKey = key; + insertSeqNo = !seqNoKey.empty(); +} |
