diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
| commit | afefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch) | |
| tree | 70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/replication/ReplicationExchange.cpp | |
| parent | 1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff) | |
| download | qpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz | |
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/replication/ReplicationExchange.cpp')
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 89 |
1 files changed, 59 insertions, 30 deletions
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index abe8a4dfb6..639cfb5d2e 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -38,46 +38,75 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, const FieldTable& args, QueueRegistry& qr, Manageable* parent) - : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {} + : Exchange(name, durable, args, parent), queues(qr), init(false) {} std::string ReplicationExchange::getType() const { return typeName; } void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) { if (args) { - std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE); - if (eventType == ENQUEUE) { - expectingEnqueue = true; - targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE); - QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue); - return; - } else if (eventType == DEQUEUE) { - std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); - Queue::shared_ptr queue = queues.find(queueName); - SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); - - QueuedMessage dequeued; - if (queue->acquireMessageAt(position, dequeued)) { - queue->dequeue(0, dequeued); - QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); - } else { - QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + int eventType = args->getAsInt(REPLICATION_EVENT_TYPE); + if (eventType) { + if (isDuplicate(args)) return; + switch (eventType) { + case ENQUEUE: + handleEnqueueEvent(args, msg); + return; + case DEQUEUE: + handleDequeueEvent(args); + return; + default: + throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } - - return; - } else if (!eventType.empty()) { - throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } + } else { + QPID_LOG(warning, "Dropping unexpected message with no headers"); } - //if we get here assume its not an event message, assume its an enqueue - if (expectingEnqueue) { - Queue::shared_ptr queue = queues.find(targetQueue); - msg.deliverTo(queue); - expectingEnqueue = false; - targetQueue.clear(); - QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue); +} + +void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queue); +} + +void ReplicationExchange::handleDequeueEvent(const FieldTable* args) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } +} + +bool ReplicationExchange::isDuplicate(const FieldTable* args) +{ + SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO)); + if (!init) { + init = true; + sequence = seqno; + return false; + } else if (seqno > sequence) { + if (seqno - sequence > 1) { + QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno); + } + sequence = seqno; + return false; } else { - QPID_LOG(warning, "Dropping unexpected message"); + QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")"); + return true; } } |
