diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueEvents.cpp | 87 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueEvents.h | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/QueueOptions.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/client/QueueOptions.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 122 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.h | 66 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 139 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.h | 59 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/constants.h | 31 |
14 files changed, 654 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 37750f8352..42a4a7b3be 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -144,6 +144,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayHardLimit*1024), *this), queueCleaner(queues, timer), + queueEvents(poller), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index ac972b0325..247493d41c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -32,6 +32,7 @@ #include "LinkRegistry.h" #include "SessionManager.h" #include "QueueCleaner.h" +#include "QueueEvents.h" #include "Vhost.h" #include "System.h" #include "Timer.h" @@ -128,6 +129,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; QueueCleaner queueCleaner; + QueueEvents queueEvents; void declareStandardExchange(const std::string& name, const std::string& type); @@ -172,6 +174,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DtxManager& getDtxManager() { return dtxManager; } DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } + QueueEvents& getQueueEvents() { return queueEvents; } SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9089ba0c54..5acc474aa1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -21,6 +21,7 @@ #include "Broker.h" #include "Queue.h" +#include "QueueEvents.h" #include "Exchange.h" #include "DeliverableMessage.h" #include "MessageStore.h" @@ -64,8 +65,11 @@ const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); -} +const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const int ENQUEUE_ONLY=1; +const int ENQUEUE_AND_DEQUEUE=2; +} Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -85,7 +89,9 @@ Queue::Queue(const string& _name, bool _autodelete, inLastNodeFailure(false), persistenceId(0), policyExceeded(false), - mgmtObject(0) + mgmtObject(0), + eventMode(0), + eventMgr(0) { if (parent != 0) { @@ -207,6 +213,25 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +{ + Mutex::ScopedLock locker(messageLock); + QPID_LOG(debug, "Attempting to acquire message at " << position); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position == position) { + message = *i; + if (lastValueQueue) { + clearLVQIndex(*i); + } + messages.erase(i); + QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); + return true; + } + } + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; +} + bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "attempting to acquire " << msg.position); @@ -515,13 +540,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ lvq[key] = msg; }else { i->second->setReplacementMessage(msg,this); - qm.payload = i->second; - dequeued(qm); + dequeued(QueuedMessage(qm.queue, i->second, qm.position)); } }else { messages.push_back(qm); listeners.populate(copy); } + if (eventMode && eventMgr) { + eventMgr->enqueued(qm); + } } copy.notify(); } @@ -659,6 +686,9 @@ void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { + eventMgr->dequeued(msg); + } } @@ -698,6 +728,8 @@ void Queue::configure(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (mgmtObject != 0) mgmtObject->set_arguments (_settings); } @@ -898,3 +930,10 @@ void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; } + +int Queue::getEventMode() { return eventMode; } + +void Queue::setQueueEventManager(QueueEvents& mgr) +{ + eventMgr = &mgr; +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e0bcc25fa3..394b5fd054 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -50,6 +50,7 @@ namespace qpid { namespace broker { class Broker; class MessageStore; + class QueueEvents; class QueueRegistry; class TransactionContext; class Exchange; @@ -96,6 +97,8 @@ namespace qpid { framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; RateTracker dequeueTracker; + int eventMode; + QueueEvents* eventMgr; void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -166,6 +169,7 @@ namespace qpid { void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); bool acquire(const QueuedMessage& msg); + bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** * Delivers a message to the queue. Will record it as @@ -279,6 +283,8 @@ namespace qpid { * Used by cluster to replicate queues. */ void setPosition(framing::SequenceNumber pos); + int getEventMode(); + void setQueueEventManager(QueueEvents&); }; } } diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp new file mode 100644 index 0000000000..a6517e1bfe --- /dev/null +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueEvents.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace broker { + +QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller) +{ + eventQueue.start(); +} + +QueueEvents::~QueueEvents() +{ + eventQueue.stop(); +} + +void QueueEvents::enqueued(const QueuedMessage& m) +{ + eventQueue.push(Event(ENQUEUE, m)); +} + +void QueueEvents::dequeued(const QueuedMessage& m) +{ + eventQueue.push(Event(DEQUEUE, m)); +} + +void QueueEvents::registerListener(const std::string& id, const EventListener& listener) +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (listeners.find(id) == listeners.end()) { + listeners[id] = listener; + } else { + throw Exception(QPID_MSG("Event listener already registered for '" << id << "'")); + } +} + +void QueueEvents::unregisterListener(const std::string& id) +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (listeners.find(id) == listeners.end()) { + throw Exception(QPID_MSG("No event listener registered for '" << id << "'")); + } else { + listeners.erase(id); + } +} + +void QueueEvents::handle(EventQueue::Queue& events) +{ + qpid::sys::Mutex::ScopedLock l(lock); + while (!events.empty()) { + for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { + i->second(events.front()); + } + events.pop_front(); + } +} + +void QueueEvents::shutdown() +{ + if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); +} + +QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h new file mode 100644 index 0000000000..2ba69e33e6 --- /dev/null +++ b/cpp/src/qpid/broker/QueueEvents.h @@ -0,0 +1,77 @@ +#ifndef QPID_BROKER_QUEUEEVENTS_H +#define QPID_BROKER_QUEUEEVENTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueuedMessage.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/PollableQueue.h" +#include <map> +#include <string> +#include <boost/function.hpp> + +namespace qpid { +namespace broker { + +/** + * Event manager for queue events. Allows queues to indicate when + * events have occured; allows listeners to register for notification + * of this. The notification happens asynchronously, in a separate + * thread. + */ +class QueueEvents +{ + public: + enum EventType {ENQUEUE, DEQUEUE}; + + struct Event + { + EventType type; + QueuedMessage msg; + + Event(EventType, const QueuedMessage&); + }; + + typedef boost::function<void (Event)> EventListener; + + QueueEvents(const boost::shared_ptr<sys::Poller>& poller); + ~QueueEvents(); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + void registerListener(const std::string& id, const EventListener&); + void unregisterListener(const std::string& id); + //process all outstanding events + void shutdown(); + private: + typedef qpid::sys::PollableQueue<Event> EventQueue; + typedef std::map<std::string, EventListener> Listeners; + + EventQueue eventQueue; + Listeners listeners; + qpid::sys::Mutex lock;//protect listeners from concurrent access + + void handle(EventQueue::Queue& e); + +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEEVENTS_H*/ diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 4450d56efb..0966db8162 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -362,6 +362,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& getBroker().getExchanges().getDefault()->bind(queue, name, 0); queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); + //if event generation is turned on, pass in a pointer to + //the QueueEvents instance to use + if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents()); + //handle automatic cleanup: if (exclusive) { exclusiveQueues.push_back(queue); diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp index ac65b0bc22..5493c05044 100644 --- a/cpp/src/qpid/client/QueueOptions.cpp +++ b/cpp/src/qpid/client/QueueOptions.cpp @@ -24,6 +24,8 @@ namespace qpid { namespace client { +enum QueueEventGeneration {ENQUEUE_ONLY=1, ENQUEUE_AND_DEQUEUE=2}; + QueueOptions::QueueOptions() {} @@ -39,6 +41,7 @@ const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue"); const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node"); const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key"); const std::string QueueOptions::strLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); +const std::string QueueOptions::strQueueEventMode("qpid.queue_event_generation"); QueueOptions::~QueueOptions() @@ -109,6 +112,10 @@ void QueueOptions::clearOrdering() erase(strLastValueQueue); } +void QueueOptions::enableQueueEvents(bool enqueueOnly) +{ + setInt(strQueueEventMode, enqueueOnly ? ENQUEUE_ONLY : ENQUEUE_AND_DEQUEUE); +} } } diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h index 114e1e49c2..8c22414cbb 100644 --- a/cpp/src/qpid/client/QueueOptions.h +++ b/cpp/src/qpid/client/QueueOptions.h @@ -27,7 +27,7 @@ namespace qpid { namespace client { enum QueueSizePolicy {NONE, REJECT, FLOW_TO_DISK, RING, RING_STRICT}; -enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE}; +enum QueueOrderingPolicy {FIFO, LVQ, LVQ_NO_BROWSE}; /** * A help class to set options on the Queue. Create a configured args while @@ -83,6 +83,13 @@ class QueueOptions: public framing::FieldTable * Use default odering policy */ void clearOrdering(); + + /** + * Turns on event generation for this queue (either enqueue only + * or for enqueue and dequeue events); the events can then be + * processed by a regsitered broker plugin. + */ + void enableQueueEvents(bool enqueueOnly); static const std::string strMaxCountKey; static const std::string strMaxSizeKey; @@ -95,6 +102,7 @@ class QueueOptions: public framing::FieldTable static const std::string strPersistLastNode; static const std::string strLVQMatchProperty; static const std::string strLastValueQueueNoBrowse; + static const std::string strQueueEventMode; }; } diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp new file mode 100644 index 0000000000..80ff77d107 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicatingEventListener.h" +#include "constants.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueueEvents.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace replication { + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::replication::constants; + +void ReplicatingEventListener::handle(QueueEvents::Event event) +{ + //create event message and enqueue it on replication queue + FieldTable headers; + boost::intrusive_ptr<Message> message; + switch (event.type) { + case QueueEvents::ENQUEUE: + headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE); + headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); + message = createEventMessage(headers); + queue->deliver(message); + //if its an enqueue, enqueue the message itself on the + //replication queue also: + queue->deliver(event.msg.payload); + QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication"); + break; + case QueueEvents::DEQUEUE: + headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE); + headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName()); + headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position); + message = createEventMessage(headers); + queue->deliver(message); + QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " + << event.msg.position << ")"); + break; + } +} + +namespace { +const std::string EMPTY; +} + +boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers) +{ + boost::intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0)); + AMQFrame header(in_place<AMQHeaderBody>()); + header.setBof(false); + header.setEof(true); + header.setBos(true); + header.setEos(true); + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setApplicationHeaders(headers); + return msg; +} + +Options* ReplicatingEventListener::getOptions() +{ + return &options; +} + +void ReplicatingEventListener::initialize(Plugin::Target& target) +{ + Broker* broker = dynamic_cast<broker::Broker*>(&target); + if (broker && !options.queue.empty()) { + if (options.createQueue) { + queue = broker->getQueues().declare(options.queue).first; + } else { + queue = broker->getQueues().find(options.queue); + } + if (queue) { + QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); + broker->getQueueEvents().registerListener(options.name, callback); + QPID_LOG(info, "Registered replicating queue event listener"); + } else { + QPID_LOG(error, "Replication queue named '" << options.queue << "' does not exist; replication plugin disabled."); + } + } +} + +void ReplicatingEventListener::earlyInitialize(Target&) {} + +ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), + name("replicator"), + createQueue(false) +{ + addOptions() + ("replication-queue", optValue(queue, "QUEUE"), "Queue on which events for other queues are recorded") + ("replication-listener-name", optValue(name, "NAME"), "name by which to register the replicating event listener") + ("create-replication-queue", optValue(createQueue), "if set, the replication will be created if it does not exist"); +} + +static ReplicatingEventListener plugin; + +}} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h new file mode 100644 index 0000000000..25e2a5b7b9 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicatingEventListener.h @@ -0,0 +1,66 @@ +#ifndef QPID_REPLICATION_REPLICATINGEVENTLISTENER_H +#define QPID_REPLICATION_REPLICATINGEVENTLISTENER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueEvents.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace replication { + +/** + * An event listener plugin that records queue events as messages on a + * replication queue, from where they can be consumed (e.g. by an + * inter-broker link to the corresponding QueueReplicationExchange + * plugin. + */ +class ReplicatingEventListener : public Plugin +{ + public: + Options* getOptions(); + void earlyInitialize(Plugin::Target& target); + void initialize(Plugin::Target& target); + void handle(qpid::broker::QueueEvents::Event); + private: + struct PluginOptions : public Options + { + std::string queue; + std::string name; + bool createQueue; + + PluginOptions(); + }; + + PluginOptions options; + qpid::broker::Queue::shared_ptr queue; + + boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers); +}; + +}} // namespace qpid::replication + +#endif /*!QPID_REPLICATION_REPLICATINGEVENTLISTENER_H*/ diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp new file mode 100644 index 0000000000..abe8a4dfb6 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicationExchange.h" +#include "constants.h" +#include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace replication { + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::replication::constants; + +ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, + const FieldTable& args, + QueueRegistry& qr, + Manageable* parent) + : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {} + +std::string ReplicationExchange::getType() const { return typeName; } + +void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args) +{ + if (args) { + std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE); + if (eventType == ENQUEUE) { + expectingEnqueue = true; + targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE); + QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue); + return; + } else if (eventType == DEQUEUE) { + std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE); + Queue::shared_ptr queue = queues.find(queueName); + SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION)); + + QueuedMessage dequeued; + if (queue->acquireMessageAt(position, dequeued)) { + queue->dequeue(0, dequeued); + QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position); + } else { + QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName); + } + + return; + } else if (!eventType.empty()) { + throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType)); + } + } + //if we get here assume its not an event message, assume its an enqueue + if (expectingEnqueue) { + Queue::shared_ptr queue = queues.find(targetQueue); + msg.deliverTo(queue); + expectingEnqueue = false; + targetQueue.clear(); + QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue); + } else { + QPID_LOG(warning, "Dropping unexpected message"); + } +} + +bool ReplicationExchange::bind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/) +{ + throw NotImplementedException("Replication exchange does not support bind operation"); +} + +bool ReplicationExchange::unbind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/) +{ + throw NotImplementedException("Replication exchange does not support unbind operation"); +} + +bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/) +{ + return false; +} + +const std::string ReplicationExchange::typeName("replication"); + + +struct ReplicationExchangePlugin : Plugin +{ + Broker* broker; + + ReplicationExchangePlugin(); + void earlyInitialize(Plugin::Target& target); + void initialize(Plugin::Target& target); + Exchange::shared_ptr create(const std::string& name, bool durable, + const framing::FieldTable& args, + management::Manageable* parent); +}; + +ReplicationExchangePlugin::ReplicationExchangePlugin() : broker(0) {} + +Exchange::shared_ptr ReplicationExchangePlugin::create(const std::string& name, bool durable, + const framing::FieldTable& args, + management::Manageable* parent) +{ + Exchange::shared_ptr e(new ReplicationExchange(name, durable, args, broker->getQueues(), parent)); + return e; +} + + +void ReplicationExchangePlugin::initialize(Plugin::Target& target) +{ + broker = dynamic_cast<broker::Broker*>(&target); + if (broker) { + ExchangeRegistry::FactoryFunction f = boost::bind(&ReplicationExchangePlugin::create, this, _1, _2, _3, _4); + broker->getExchanges().registerType(ReplicationExchange::typeName, f); + QPID_LOG(info, "Registered replication exchange"); + } +} + +void ReplicationExchangePlugin::earlyInitialize(Target&) {} + +static ReplicationExchangePlugin exchangePlugin; + +}} // namespace qpid::replication diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h new file mode 100644 index 0000000000..ed2b5956b6 --- /dev/null +++ b/cpp/src/qpid/replication/ReplicationExchange.h @@ -0,0 +1,59 @@ +#ifndef QPID_REPLICATION_REPLICATIONEXCHANGE_H +#define QPID_REPLICATION_REPLICATIONEXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Exchange.h" + +namespace qpid { +namespace replication { + +/** + * A custom exchange plugin that processes incoming messages + * representing enqueue or dequeue events for particular queues and + * carries out the corresponding action to replicate that on the local + * broker. + */ +class ReplicationExchange : public qpid::broker::Exchange +{ + public: + static const std::string typeName; + + ReplicationExchange(const std::string& name, bool durable, + const qpid::framing::FieldTable& args, + qpid::broker::QueueRegistry& queues, + qpid::management::Manageable* parent = 0); + + std::string getType() const; + + void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); + + bool bind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + bool unbind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); + private: + qpid::broker::QueueRegistry& queues; + bool expectingEnqueue; + std::string targetQueue; +}; +}} // namespace qpid::replication + +#endif /*!QPID_REPLICATION_REPLICATIONEXCHANGE_H*/ diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h new file mode 100644 index 0000000000..b0cef7570c --- /dev/null +++ b/cpp/src/qpid/replication/constants.h @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace replication { +namespace constants { + +const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type"); +const std::string ENQUEUE("enqueue"); +const std::string DEQUEUE("dequeue"); +const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue"); +const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position"); + +}}} |
