diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 35 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 7 |
6 files changed, 61 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index af4ae12177..dd41f74790 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -265,14 +265,24 @@ void Primary::addReplica(ReplicatingSubscription& rs) { replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; } -void Primary::skip( +void Primary::skipEnqueues( const types::Uuid& backup, const boost::shared_ptr<broker::Queue>& queue, const ReplicationIdSet& ids) { sys::Mutex::ScopedLock l(lock); ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); - if (i != replicas.end()) i->second->addSkip(ids); + if (i != replicas.end()) i->second->skipEnqueues(ids); +} + +void Primary::skipDequeues( + const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids) +{ + sys::Mutex::ScopedLock l(lock); + ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue)); + if (i != replicas.end()) i->second->skipDequeues(ids); } // Called from ReplicatingSubscription::cancel diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index af368bca0f..46cf990834 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -90,9 +90,14 @@ class Primary : public Role void removeReplica(const ReplicatingSubscription&); /** Skip replication of ids to queue on backup. */ - void skip(const types::Uuid& backup, - const boost::shared_ptr<broker::Queue>& queue, - const ReplicationIdSet& ids); + void skipEnqueues(const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids); + + /** Skip replication of dequeue of ids to queue on backup. */ + void skipDequeues(const types::Uuid& backup, + const boost::shared_ptr<broker::Queue>& queue, + const ReplicationIdSet& ids); // Called via BrokerObserver void queueCreate(const QueuePtr&); diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index c94ced7024..be3dc25653 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -165,6 +165,7 @@ void PrimaryTxObserver::dequeue( if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); empty = false; + dequeues[q] += id; txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); } } @@ -180,25 +181,30 @@ struct Skip { const ReplicationIdSet& ids_) : backup(backup_), queue(queue_), ids(ids_) {} - void skip(Primary& p) const { p.skip(backup, queue, ids); } + void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); } + void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); } }; } // namespace +void PrimaryTxObserver::skip(Mutex::ScopedLock&) { + // Tell replicating subscriptions to skip IDs in the transaction. + vector<Skip> skipEnq, skipDeq; + for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) { + for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) + skipEnq.push_back(Skip(*b, q->first, q->second)); + for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q) + skipDeq.push_back(Skip(*b, q->first, q->second)); + } + Mutex::ScopedUnlock u(lock); // Outside lock + for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary))); + for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary))); +} + bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare " << backups); - vector<Skip> skips; - { - Mutex::ScopedLock l(lock); - checkState(SENDING, "Too late for prepare"); - state = PREPARING; - // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) - for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) - skips.push_back(Skip(*b, q->first, q->second)); - } - // Outside lock - for_each(skips.begin(), skips.end(), - boost::bind(&Skip::skip, _1, boost::ref(primary))); + Mutex::ScopedLock l(lock); + checkState(SENDING, "Too late for prepare"); + state = PREPARING; txQueue->deliver(TxPrepareEvent().message()); return true; } @@ -208,6 +214,7 @@ void PrimaryTxObserver::commit() { Mutex::ScopedLock l(lock); checkState(PREPARING, "Cannot commit, not preparing"); if (incomplete.size() == 0) { + skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue. txQueue->deliver(TxCommitEvent().message()); end(l); } else { diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 5b7c2e3e93..6ea1ba185b 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -99,6 +99,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&); void initialize(); + void skip(sys::Mutex::ScopedLock&); void checkState(State expect, const std::string& msg); void end(sys::Mutex::ScopedLock&); void txPrepareOkEvent(const std::string& data); @@ -120,7 +121,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, types::Uuid id; std::string exchangeName; QueuePtr txQueue; - QueueIdsMap enqueues; + QueueIdsMap enqueues, dequeues; UuidSet backups; // All backups of transaction. UuidSet incomplete; // Incomplete backups (not yet responded to prepare) bool empty; // True if the transaction is empty - no enqueues/dequeues. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index a0cfa393aa..908458fad3 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -161,7 +161,7 @@ void ReplicatingSubscription::initialize() { { sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. - skip = backupIds - initDequeues; // Messages already on the backup. + skipEnqueue = backupIds - initDequeues; // Messages already on the backup. // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. position = front; @@ -169,7 +169,7 @@ void ReplicatingSubscription::initialize() { QPID_LOG(debug, logPrefix << "Subscribed: front " << front << ", back " << back << ", guarded " << guard->getFirst() - << ", on backup " << skip); + << ", on backup " << skipEnqueue); checkReady(l); } @@ -215,9 +215,9 @@ bool ReplicatingSubscription::deliver( position = m.getSequence(); try { bool result = false; - if (skip.contains(id)) { + if (skipEnqueue.contains(id)) { QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m)); - skip -= id; + skipEnqueue -= id; guard->complete(id); // This will never be acknowledged. notify(); result = true; @@ -281,6 +281,9 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l) { + ReplicationIdSet oldDequeues = dequeues; + dequeues -= skipDequeue; // Don't send skipped dequeues + skipDequeue -= oldDequeues; // Forget dequeues that would have been sent. if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); sendEvent(DequeueEvent(dequeues), l); @@ -332,9 +335,14 @@ bool ReplicatingSubscription::doDispatch() } } -void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) { +void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) { Mutex::ScopedLock l(lock); - skip += ids; + skipEnqueue += ids; +} + +void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) { + Mutex::ScopedLock l(lock); + skipDequeue += ids; } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 868442da7e..0e3f544d44 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -137,8 +137,8 @@ class ReplicatingSubscription : BrokerInfo getBrokerInfo() const { return info; } - /** Skip replicating enqueue of of ids. */ - void addSkip(const ReplicationIdSet& ids); +void skipEnqueues(const ReplicationIdSet& ids); +void skipDequeues(const ReplicationIdSet& ids); protected: bool doDispatch(); @@ -147,7 +147,8 @@ class ReplicatingSubscription : std::string logPrefix; QueuePosition position; ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. - ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. + ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues. + ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues. ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. bool wasStopped; bool ready; |
