diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
| commit | afefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch) | |
| tree | 70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/replication | |
| parent | 1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff) | |
| download | qpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz | |
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/replication')
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 92 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 89 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/constants.h | 12 |
5 files changed, 143 insertions, 68 deletions
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 80ff77d107..d50ef852ef 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/QueueEvents.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" @@ -35,27 +36,14 @@ using namespace qpid::replication::constants; void ReplicatingEventListener::handle(QueueEvents::Event event) { - //create event message and enqueue it on replication queue - FieldTable headers; - boost::intrusive_ptr<Message> message; switch (event.type) { case QueueEvents::ENQUEUE: - headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE); - headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); - message = createEventMessage(headers); - queue->deliver(message); - //if its an enqueue, enqueue the message itself on the - //replication queue also: - queue->deliver(event.msg.payload); - QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication"); + deliverEnqueueMessage(event.msg); + QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication"); break; case QueueEvents::DEQUEUE: - headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE); - headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); - headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position); - message = createEventMessage(headers); - queue->deliver(message); - QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " + deliverDequeueMessage(event.msg); + QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " << event.msg.position << ")"); break; } @@ -65,20 +53,64 @@ namespace { const std::string EMPTY; } -boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers) +void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued) +{ + FieldTable headers; + headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); + headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); + headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); + headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); + boost::intrusive_ptr<Message> msg(createMessage(headers)); + queue->deliver(msg); +} + +void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued) +{ + boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); + FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); + headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); + headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); + headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); + queue->deliver(msg); +} + +boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers) +{ + boost::intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); + AMQFrame header(in_place<AMQHeaderBody>()); + header.setBof(false); + header.setEof(true); + header.setBos(true); + header.setEos(true); + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setApplicationHeaders(headers); + return msg; +} + +struct AppendingHandler : FrameHandler +{ + boost::intrusive_ptr<Message> msg; + + AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {} + + void handle(AMQFrame& f) + { + msg->getFrames().append(f); + } +}; + +boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original) { - boost::intrusive_ptr<Message> msg(new Message()); - AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); - AMQFrame header(in_place<AMQHeaderBody>()); - header.setBof(false); - header.setEof(true); - header.setBos(true); - header.setEos(true); - msg->getFrames().append(method); - msg->getFrames().append(header); - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setApplicationHeaders(headers); - return msg; + boost::intrusive_ptr<Message> copy(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); + AppendingHandler handler(copy); + handler.handle(method); + original->sendHeader(handler, std::numeric_limits<int16_t>::max()); + original->sendContent(queue, handler, std::numeric_limits<int16_t>::max()); + return copy; } Options* ReplicatingEventListener::getOptions() diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h index 25e2a5b7b9..7616c7ac8a 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.h +++ b/cpp/src/qpid/replication/ReplicatingEventListener.h @@ -28,6 +28,7 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace replication { @@ -57,8 +58,14 @@ class ReplicatingEventListener : public Plugin PluginOptions options; qpid::broker::Queue::shared_ptr queue; + qpid::framing::SequenceNumber sequence; - boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers); + void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued); + void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued); + + boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers); + boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue, + boost::intrusive_ptr<qpid::broker::Message> original); }; }} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index abe8a4dfb6..639cfb5d2e 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -38,46 +38,75 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, const FieldTable& args, QueueRegistry& qr, Manageable* parent) - : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {} + : Exchange(name, durable, args, parent), queues(qr), init(false) {} std::string ReplicationExchange::getType() const { return typeName; } void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) { if (args) { - std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE); - if (eventType == ENQUEUE) { - expectingEnqueue = true; - targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE); - QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue); - return; - } else if (eventType == DEQUEUE) { - std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); - Queue::shared_ptr queue = queues.find(queueName); - SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); - - QueuedMessage dequeued; - if (queue->acquireMessageAt(position, dequeued)) { - queue->dequeue(0, dequeued); - QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); - } else { - QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + int eventType = args->getAsInt(REPLICATION_EVENT_TYPE); + if (eventType) { + if (isDuplicate(args)) return; + switch (eventType) { + case ENQUEUE: + handleEnqueueEvent(args, msg); + return; + case DEQUEUE: + handleDequeueEvent(args); + return; + default: + throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } - - return; - } else if (!eventType.empty()) { - throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); } + } else { + QPID_LOG(warning, "Dropping unexpected message with no headers"); } - //if we get here assume its not an event message, assume its an enqueue - if (expectingEnqueue) { - Queue::shared_ptr queue = queues.find(targetQueue); - msg.deliverTo(queue); - expectingEnqueue = false; - targetQueue.clear(); - QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue); +} + +void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); + headers.erase(REPLICATION_TARGET_QUEUE); + headers.erase(REPLICATION_EVENT_SEQNO); + headers.erase(REPLICATION_EVENT_TYPE); + msg.deliverTo(queue); + QPID_LOG(debug, "Enqueued replicated message onto " << queue); +} + +void ReplicationExchange::handleDequeueEvent(const FieldTable* args) +{ + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } +} + +bool ReplicationExchange::isDuplicate(const FieldTable* args) +{ + SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO)); + if (!init) { + init = true; + sequence = seqno; + return false; + } else if (seqno > sequence) { + if (seqno - sequence > 1) { + QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno); + } + sequence = seqno; + return false; } else { - QPID_LOG(warning, "Dropping unexpected message"); + QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")"); + return true; } } diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h index ed2b5956b6..897e4a954e 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.h +++ b/cpp/src/qpid/replication/ReplicationExchange.h @@ -22,6 +22,7 @@ * */ #include "qpid/broker/Exchange.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace replication { @@ -51,8 +52,12 @@ class ReplicationExchange : public qpid::broker::Exchange bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); private: qpid::broker::QueueRegistry& queues; - bool expectingEnqueue; - std::string targetQueue; + qpid::framing::SequenceNumber sequence; + bool init; + + bool isDuplicate(const qpid::framing::FieldTable* args); + void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg); + void handleDequeueEvent(const qpid::framing::FieldTable* args); }; }} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h index b0cef7570c..fb7085c570 100644 --- a/cpp/src/qpid/replication/constants.h +++ b/cpp/src/qpid/replication/constants.h @@ -22,10 +22,12 @@ namespace qpid { namespace replication { namespace constants { -const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type"); -const std::string ENQUEUE("enqueue"); -const std::string DEQUEUE("dequeue"); -const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue"); -const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position"); +const std::string REPLICATION_EVENT_TYPE("qpid.replication.type"); +const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno"); +const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue"); +const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message"); + +const int ENQUEUE(1); +const int DEQUEUE(2); }}} |
