diff options
Diffstat (limited to 'cpp/src/qpid/replication/ReplicationExchange.cpp')
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 89 |
1 files changed, 59 insertions, 30 deletions
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index abe8a4dfb6..639cfb5d2e 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -38,46 +38,75 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, const FieldTable& args, QueueRegistry& qr, Manageable* parent) - : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {} + : Exchange(name, durable, args, parent), queues(qr), init(false) {} std::string ReplicationExchange::getType() const { return typeName; } void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) { if (args) { - std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE); - if (eventType == ENQUEUE) { - expectingEnqueue = true; - targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE); - QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue); - return; - } else if (eventType == DEQUEUE) { - std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); - Queue::shared_ptr queue = queues.find(queueName); - SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); - - QueuedMessage dequeued; - if (queue->acquireMessageAt(position, dequeued)) { - queue->dequeue(0, dequeued); - QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); - } else { - QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + int eventType = args->getAsInt(REPLICATION_EVENT_TYPE); + if (eventType) { + if (isDuplicate(args)) return; + switch (eventType) { + case ENQUEUE: + handleEnqueueEvent(args, msg); + return; + case DEQUEUE: + handleDequeueEvent(args); + return; + default: + throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } - - return; - } else if (!eventType.empty()) { - throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } + } else { + QPID_LOG(warning, "Dropping unexpected message with no headers"); } - //if we get here assume its not an event message, assume its an enqueue - if (expectingEnqueue) { - Queue::shared_ptr queue = queues.find(targetQueue); - msg.deliverTo(queue); - expectingEnqueue = false; - targetQueue.clear(); - QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue); +} + +void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queue); +} + +void ReplicationExchange::handleDequeueEvent(const FieldTable* args) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } +} + +bool ReplicationExchange::isDuplicate(const FieldTable* args) +{ + SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO)); + if (!init) { + init = true; + sequence = seqno; + return false; + } else if (seqno > sequence) { + if (seqno - sequence > 1) { + QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno); + } + sequence = seqno; + return false; } else { - QPID_LOG(warning, "Dropping unexpected message"); + QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")"); + return true; } } |
