diff options
Diffstat (limited to 'cpp/src/qpid/replication')
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/constants.h | 1 |
3 files changed, 23 insertions, 9 deletions
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 1a3ce1c069..b7d52372f4 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -72,6 +72,7 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); + headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position); route(msg); } diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index c0cc36efe3..053335e6ad 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -83,15 +83,27 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); Queue::shared_ptr queue = queues.find(queueName); if (queue) { - FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - msg.deliverTo(queue); - QPID_LOG(debug, "Enqueued replicated message onto " << queueName); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(); - mgmtExchange->inc_byteRoutes( msg.contentSize()); + + SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION)); + + if (queue->getPosition() > seqno1) // test queue.pos < seqnumber + { + QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue"); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } else { + queue->setPosition(--seqno1); // note that queue will ++ before enqueue. + + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queueName); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes(); + mgmtExchange->inc_byteRoutes( msg.contentSize()); + } } } else { QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist"); diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h index fb7085c570..c5ba7d3d6a 100644 --- a/cpp/src/qpid/replication/constants.h +++ b/cpp/src/qpid/replication/constants.h @@ -26,6 +26,7 @@ const std::string REPLICATION_EVENT_TYPE("qpid.replication.type"); const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno"); const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue"); const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message"); +const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position"); const int ENQUEUE(1); const int DEQUEUE(2); |
