summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/replication
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
committerGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
commitafefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch)
tree70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/replication
parent1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff)
downloadqpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates * added support for acknowledged transfer over inter-broker bridges * added option to qpid-route to control this git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/replication')
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.cpp92
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.h9
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.cpp89
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.h9
-rw-r--r--cpp/src/qpid/replication/constants.h12
5 files changed, 143 insertions, 68 deletions
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index 80ff77d107..d50ef852ef 100644
--- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -35,27 +36,14 @@ using namespace qpid::replication::constants;
void ReplicatingEventListener::handle(QueueEvents::Event event)
{
- //create event message and enqueue it on replication queue
- FieldTable headers;
- boost::intrusive_ptr<Message> message;
switch (event.type) {
case QueueEvents::ENQUEUE:
- headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE);
- headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
- message = createEventMessage(headers);
- queue->deliver(message);
- //if its an enqueue, enqueue the message itself on the
- //replication queue also:
- queue->deliver(event.msg.payload);
- QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication");
+ deliverEnqueueMessage(event.msg);
+ QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication");
break;
case QueueEvents::DEQUEUE:
- headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE);
- headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
- headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position);
- message = createEventMessage(headers);
- queue->deliver(message);
- QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
+ deliverDequeueMessage(event.msg);
+ QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
<< event.msg.position << ")");
break;
}
@@ -65,20 +53,64 @@ namespace {
const std::string EMPTY;
}
-boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers)
+void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued)
+{
+ FieldTable headers;
+ headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
+ headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
+ headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
+ headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
+ boost::intrusive_ptr<Message> msg(createMessage(headers));
+ queue->deliver(msg);
+}
+
+void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
+{
+ boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
+ FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+ headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
+ headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+ queue->deliver(msg);
+}
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers)
+{
+ boost::intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+ AMQFrame header(in_place<AMQHeaderBody>());
+ header.setBof(false);
+ header.setEof(true);
+ header.setBos(true);
+ header.setEos(true);
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setApplicationHeaders(headers);
+ return msg;
+}
+
+struct AppendingHandler : FrameHandler
+{
+ boost::intrusive_ptr<Message> msg;
+
+ AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {}
+
+ void handle(AMQFrame& f)
+ {
+ msg->getFrames().append(f);
+ }
+};
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original)
{
- boost::intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
- header.setBof(false);
- header.setEof(true);
- header.setBos(true);
- header.setEos(true);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setApplicationHeaders(headers);
- return msg;
+ boost::intrusive_ptr<Message> copy(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+ AppendingHandler handler(copy);
+ handler.handle(method);
+ original->sendHeader(handler, std::numeric_limits<int16_t>::max());
+ original->sendContent(queue, handler, std::numeric_limits<int16_t>::max());
+ return copy;
}
Options* ReplicatingEventListener::getOptions()
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h
index 25e2a5b7b9..7616c7ac8a 100644
--- a/cpp/src/qpid/replication/ReplicatingEventListener.h
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.h
@@ -28,6 +28,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace replication {
@@ -57,8 +58,14 @@ class ReplicatingEventListener : public Plugin
PluginOptions options;
qpid::broker::Queue::shared_ptr queue;
+ qpid::framing::SequenceNumber sequence;
- boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers);
+ void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
+ void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
+
+ boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers);
+ boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue,
+ boost::intrusive_ptr<qpid::broker::Message> original);
};
}} // namespace qpid::replication
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp
index abe8a4dfb6..639cfb5d2e 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ b/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -38,46 +38,75 @@ ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
const FieldTable& args,
QueueRegistry& qr,
Manageable* parent)
- : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {}
+ : Exchange(name, durable, args, parent), queues(qr), init(false) {}
std::string ReplicationExchange::getType() const { return typeName; }
void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
{
if (args) {
- std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE);
- if (eventType == ENQUEUE) {
- expectingEnqueue = true;
- targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE);
- QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue);
- return;
- } else if (eventType == DEQUEUE) {
- std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
- Queue::shared_ptr queue = queues.find(queueName);
- SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
-
- QueuedMessage dequeued;
- if (queue->acquireMessageAt(position, dequeued)) {
- queue->dequeue(0, dequeued);
- QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
- } else {
- QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+ int eventType = args->getAsInt(REPLICATION_EVENT_TYPE);
+ if (eventType) {
+ if (isDuplicate(args)) return;
+ switch (eventType) {
+ case ENQUEUE:
+ handleEnqueueEvent(args, msg);
+ return;
+ case DEQUEUE:
+ handleDequeueEvent(args);
+ return;
+ default:
+ throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
}
-
- return;
- } else if (!eventType.empty()) {
- throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
}
+ } else {
+ QPID_LOG(warning, "Dropping unexpected message with no headers");
}
- //if we get here assume its not an event message, assume its an enqueue
- if (expectingEnqueue) {
- Queue::shared_ptr queue = queues.find(targetQueue);
- msg.deliverTo(queue);
- expectingEnqueue = false;
- targetQueue.clear();
- QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue);
+}
+
+void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg)
+{
+ std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+ Queue::shared_ptr queue = queues.find(queueName);
+ FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.erase(REPLICATION_TARGET_QUEUE);
+ headers.erase(REPLICATION_EVENT_SEQNO);
+ headers.erase(REPLICATION_EVENT_TYPE);
+ msg.deliverTo(queue);
+ QPID_LOG(debug, "Enqueued replicated message onto " << queue);
+}
+
+void ReplicationExchange::handleDequeueEvent(const FieldTable* args)
+{
+ std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+ Queue::shared_ptr queue = queues.find(queueName);
+ SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
+
+ QueuedMessage dequeued;
+ if (queue->acquireMessageAt(position, dequeued)) {
+ queue->dequeue(0, dequeued);
+ QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+ } else {
+ QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+ }
+}
+
+bool ReplicationExchange::isDuplicate(const FieldTable* args)
+{
+ SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO));
+ if (!init) {
+ init = true;
+ sequence = seqno;
+ return false;
+ } else if (seqno > sequence) {
+ if (seqno - sequence > 1) {
+ QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno);
+ }
+ sequence = seqno;
+ return false;
} else {
- QPID_LOG(warning, "Dropping unexpected message");
+ QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")");
+ return true;
}
}
diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h
index ed2b5956b6..897e4a954e 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.h
+++ b/cpp/src/qpid/replication/ReplicationExchange.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace replication {
@@ -51,8 +52,12 @@ class ReplicationExchange : public qpid::broker::Exchange
bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
private:
qpid::broker::QueueRegistry& queues;
- bool expectingEnqueue;
- std::string targetQueue;
+ qpid::framing::SequenceNumber sequence;
+ bool init;
+
+ bool isDuplicate(const qpid::framing::FieldTable* args);
+ void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
+ void handleDequeueEvent(const qpid::framing::FieldTable* args);
};
}} // namespace qpid::replication
diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h
index b0cef7570c..fb7085c570 100644
--- a/cpp/src/qpid/replication/constants.h
+++ b/cpp/src/qpid/replication/constants.h
@@ -22,10 +22,12 @@ namespace qpid {
namespace replication {
namespace constants {
-const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type");
-const std::string ENQUEUE("enqueue");
-const std::string DEQUEUE("dequeue");
-const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue");
-const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position");
+const std::string REPLICATION_EVENT_TYPE("qpid.replication.type");
+const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
+const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
+const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
+
+const int ENQUEUE(1);
+const int DEQUEUE(2);
}}}