diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/constants.h | 1 |
5 files changed, 30 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index ee5831fed0..3c8c237b98 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -1012,6 +1012,10 @@ void Queue::setPosition(SequenceNumber n) { sequence = n; } +SequenceNumber Queue::getPosition() { + return sequence; +} + int Queue::getEventMode() { return eventMode; } void Queue::setQueueEventManager(QueueEvents& mgr) diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 475766ae30..6703d06bbb 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -324,6 +324,9 @@ namespace qpid { * Used by cluster to replicate queues. */ void setPosition(framing::SequenceNumber pos); + /** return current position sequence number for the next message on the queue. + */ + framing::SequenceNumber getPosition(); int getEventMode(); void setQueueEventManager(QueueEvents&); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 1a3ce1c069..b7d52372f4 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -72,6 +72,7 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); + headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position); route(msg); } diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index c0cc36efe3..053335e6ad 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -83,15 +83,27 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); Queue::shared_ptr queue = queues.find(queueName); if (queue) { - FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - msg.deliverTo(queue); - QPID_LOG(debug, "Enqueued replicated message onto " << queueName); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(); - mgmtExchange->inc_byteRoutes( msg.contentSize()); + + SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION)); + + if (queue->getPosition() > seqno1) // test queue.pos < seqnumber + { + QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue"); + mgmtExchange->inc_msgDrops(); + mgmtExchange->inc_byteDrops(msg.contentSize()); + } else { + queue->setPosition(--seqno1); // note that queue will ++ before enqueue. + + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queueName); + if (mgmtExchange != 0) { + mgmtExchange->inc_msgRoutes(); + mgmtExchange->inc_byteRoutes( msg.contentSize()); + } } } else { QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist"); diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h index fb7085c570..c5ba7d3d6a 100644 --- a/cpp/src/qpid/replication/constants.h +++ b/cpp/src/qpid/replication/constants.h @@ -26,6 +26,7 @@ const std::string REPLICATION_EVENT_TYPE("qpid.replication.type"); const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno"); const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue"); const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message"); +const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position"); const int ENQUEUE(1); const int DEQUEUE(2); |
