summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h11
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp35
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h3
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h7
6 files changed, 61 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index af4ae12177..dd41f74790 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -265,14 +265,24 @@ void Primary::addReplica(ReplicatingSubscription& rs) {
replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
}
-void Primary::skip(
+void Primary::skipEnqueues(
const types::Uuid& backup,
const boost::shared_ptr<broker::Queue>& queue,
const ReplicationIdSet& ids)
{
sys::Mutex::ScopedLock l(lock);
ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
- if (i != replicas.end()) i->second->addSkip(ids);
+ if (i != replicas.end()) i->second->skipEnqueues(ids);
+}
+
+void Primary::skipDequeues(
+ const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids)
+{
+ sys::Mutex::ScopedLock l(lock);
+ ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
+ if (i != replicas.end()) i->second->skipDequeues(ids);
}
// Called from ReplicatingSubscription::cancel
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index af368bca0f..46cf990834 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -90,9 +90,14 @@ class Primary : public Role
void removeReplica(const ReplicatingSubscription&);
/** Skip replication of ids to queue on backup. */
- void skip(const types::Uuid& backup,
- const boost::shared_ptr<broker::Queue>& queue,
- const ReplicationIdSet& ids);
+ void skipEnqueues(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
+
+ /** Skip replication of dequeue of ids to queue on backup. */
+ void skipDequeues(const types::Uuid& backup,
+ const boost::shared_ptr<broker::Queue>& queue,
+ const ReplicationIdSet& ids);
// Called via BrokerObserver
void queueCreate(const QueuePtr&);
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index c94ced7024..be3dc25653 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -165,6 +165,7 @@ void PrimaryTxObserver::dequeue(
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
empty = false;
+ dequeues[q] += id;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
}
@@ -180,25 +181,30 @@ struct Skip {
const ReplicationIdSet& ids_) :
backup(backup_), queue(queue_), ids(ids_) {}
- void skip(Primary& p) const { p.skip(backup, queue, ids); }
+ void skipEnqueues(Primary& p) const { p.skipEnqueues(backup, queue, ids); }
+ void skipDequeues(Primary& p) const { p.skipDequeues(backup, queue, ids); }
};
} // namespace
+void PrimaryTxObserver::skip(Mutex::ScopedLock&) {
+ // Tell replicating subscriptions to skip IDs in the transaction.
+ vector<Skip> skipEnq, skipDeq;
+ for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) {
+ for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
+ skipEnq.push_back(Skip(*b, q->first, q->second));
+ for (QueueIdsMap::iterator q = dequeues.begin(); q != dequeues.end(); ++q)
+ skipDeq.push_back(Skip(*b, q->first, q->second));
+ }
+ Mutex::ScopedUnlock u(lock); // Outside lock
+ for_each(skipEnq.begin(), skipEnq.end(), boost::bind(&Skip::skipEnqueues, _1, boost::ref(primary)));
+ for_each(skipDeq.begin(), skipDeq.end(), boost::bind(&Skip::skipDequeues, _1, boost::ref(primary)));
+}
+
bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare " << backups);
- vector<Skip> skips;
- {
- Mutex::ScopedLock l(lock);
- checkState(SENDING, "Too late for prepare");
- state = PREPARING;
- // Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
- for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
- skips.push_back(Skip(*b, q->first, q->second));
- }
- // Outside lock
- for_each(skips.begin(), skips.end(),
- boost::bind(&Skip::skip, _1, boost::ref(primary)));
+ Mutex::ScopedLock l(lock);
+ checkState(SENDING, "Too late for prepare");
+ state = PREPARING;
txQueue->deliver(TxPrepareEvent().message());
return true;
}
@@ -208,6 +214,7 @@ void PrimaryTxObserver::commit() {
Mutex::ScopedLock l(lock);
checkState(PREPARING, "Cannot commit, not preparing");
if (incomplete.size() == 0) {
+ skip(l); // Tell local replicating subscriptions to skip tx enqueue/dequeue.
txQueue->deliver(TxCommitEvent().message());
end(l);
} else {
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 5b7c2e3e93..6ea1ba185b 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -99,6 +99,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
PrimaryTxObserver(Primary&, HaBroker&, const boost::intrusive_ptr<broker::TxBuffer>&);
void initialize();
+ void skip(sys::Mutex::ScopedLock&);
void checkState(State expect, const std::string& msg);
void end(sys::Mutex::ScopedLock&);
void txPrepareOkEvent(const std::string& data);
@@ -120,7 +121,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
types::Uuid id;
std::string exchangeName;
QueuePtr txQueue;
- QueueIdsMap enqueues;
+ QueueIdsMap enqueues, dequeues;
UuidSet backups; // All backups of transaction.
UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
bool empty; // True if the transaction is empty - no enqueues/dequeues.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index a0cfa393aa..908458fad3 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -161,7 +161,7 @@ void ReplicatingSubscription::initialize() {
{
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
- skip = backupIds - initDequeues; // Messages already on the backup.
+ skipEnqueue = backupIds - initDequeues; // Messages already on the backup.
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
@@ -169,7 +169,7 @@ void ReplicatingSubscription::initialize() {
QPID_LOG(debug, logPrefix << "Subscribed: front " << front
<< ", back " << back
<< ", guarded " << guard->getFirst()
- << ", on backup " << skip);
+ << ", on backup " << skipEnqueue);
checkReady(l);
}
@@ -215,9 +215,9 @@ bool ReplicatingSubscription::deliver(
position = m.getSequence();
try {
bool result = false;
- if (skip.contains(id)) {
+ if (skipEnqueue.contains(id)) {
QPID_LOG(trace, logPrefix << "Skip " << LogMessageId(*getQueue(), m));
- skip -= id;
+ skipEnqueue -= id;
guard->complete(id); // This will never be acknowledged.
notify();
result = true;
@@ -281,6 +281,9 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
{
+ ReplicationIdSet oldDequeues = dequeues;
+ dequeues -= skipDequeue; // Don't send skipped dequeues
+ skipDequeue -= oldDequeues; // Forget dequeues that would have been sent.
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
sendEvent(DequeueEvent(dequeues), l);
@@ -332,9 +335,14 @@ bool ReplicatingSubscription::doDispatch()
}
}
-void ReplicatingSubscription::addSkip(const ReplicationIdSet& ids) {
+void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) {
Mutex::ScopedLock l(lock);
- skip += ids;
+ skipEnqueue += ids;
+}
+
+void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ skipDequeue += ids;
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 868442da7e..0e3f544d44 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -137,8 +137,8 @@ class ReplicatingSubscription :
BrokerInfo getBrokerInfo() const { return info; }
- /** Skip replicating enqueue of of ids. */
- void addSkip(const ReplicationIdSet& ids);
+void skipEnqueues(const ReplicationIdSet& ids);
+void skipDequeues(const ReplicationIdSet& ids);
protected:
bool doDispatch();
@@ -147,7 +147,8 @@ class ReplicatingSubscription :
std::string logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
- ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
+ ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.
+ ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
bool wasStopped;
bool ready;