diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/QueueReplicator.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 161 |
1 files changed, 118 insertions, 43 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index cac1fdac29..98220b2098 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -22,12 +22,15 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" @@ -37,43 +40,89 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); const std::string TYPE_NAME("qpid.queue-replicator"); -const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); +const std::string QPID_HA("qpid.ha-"); } namespace qpid { namespace ha { using namespace broker; using namespace framing; +using namespace std; +using sys::Mutex; -const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:"); -const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); -const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); +const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue"); +const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position"); +const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } +bool QueueReplicator::isReplicatorName(const std::string& name) { + return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0; +} + bool QueueReplicator::isEventKey(const std::string key) { - const std::string& prefix = QPID_HA_EVENT_PREFIX; + const std::string& prefix = QPID_HA; bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; return ret; } +class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& prefix) : logPrefix(prefix) {} + void connectionException(framing::connection::CloseCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Connection error: " << msg); + } + void channelException(framing::session::DetachCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Channel error: " << msg); + } + void executionException(framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Execution error: " << msg); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached"); + } + private: + std::string logPrefix; +}; + +class QueueReplicator::QueueObserver : public broker::QueueObserver { + public: + QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {} + void enqueued(const Message&) {} + void dequeued(const Message&) {} + void acquired(const Message&) {} + void requeued(const Message&) {} + void consumerAdded( const Consumer& ) {} + void consumerRemoved( const Consumer& ) {} + // Queue observer is destroyed when the queue is. + void destroy() { queueReplicator->destroy(); } + private: + boost::shared_ptr<QueueReplicator> queueReplicator; +}; + QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()) + queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), + settings(hb.getSettings()) { + args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); + framing::FieldTable args = getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); + setArgs(args); } // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( bridgeName, @@ -93,48 +142,57 @@ void QueueReplicator::activate() { boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); bridge = result.first; + bridge->setErrorListener( + boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); + boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this())); + queue->addObserver(observer); } -QueueReplicator::~QueueReplicator() { deactivate(); } +QueueReplicator::~QueueReplicator() {} -void QueueReplicator::deactivate() { - // destroy the route - sys::Mutex::ScopedLock l(lock); - if (bridge) { - bridge->close(); - bridge.reset(); - QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); - } +void QueueReplicator::destroy() { + // Called from Queue::destroyed() + Mutex::ScopedLock l(lock); + if (!bridge) return; + QPID_LOG(debug, logPrefix << "Destroyed."); + bridge->close(); + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + link.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); } // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - FieldTable settings; - settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - settings.setInt(ReplicatingSubscription::QPID_BACK, - queue->getPosition()); - settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, - brokerInfo.asFieldTable()); - SequenceNumber front; - if (ReplicatingSubscription::getFront(*queue, front)) { - settings.setInt(ReplicatingSubscription::QPID_FRONT, front); - QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front); + FieldTable arguments; + arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition()); + arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable()); + SequenceNumber front, back; + queue->getRange(front, back, broker::REPLICATOR); + if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front); + try { + peer.getMessage().subscribe( + args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, + false/*exclusive*/, "", 0, arguments); + peer.getMessage().setFlowMode(getName(), 1); // Window + peer.getMessage().flow(getName(), 0, settings.getFlowMessages()); + peer.getMessage().flow(getName(), 1, settings.getFlowBytes()); + } + catch(const exception& e) { + QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what())); + throw; } - peer.getMessage().subscribe( - args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, - false/*exclusive*/, "", 0, settings); - // FIXME aconway 2012-05-22: use a finite credit window? - peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); - peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - qpid::Address primary; link->getRemoteAddress(primary); QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); - QPID_LOG(trace, logPrefix << "Subscription settings: " << settings); + QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments); } namespace { @@ -147,17 +205,35 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { +void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { + boost::shared_ptr<Queue> q; + { + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed + q = queue; + } // Thread safe: only calls thread safe Queue functions. queue->dequeueMessageAt(n); } +namespace { +bool getSequence(const Message& message, SequenceNumber& result) { + result = message.getSequence(); + return true; +} +bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) { + QueueCursor cursor(REPLICATOR); + return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1); +} +} // namespace + // Called in connection thread of the queues bridge to primary. void QueueReplicator::route(Deliverable& msg) { try { const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. @@ -176,16 +252,15 @@ void QueueReplicator::route(Deliverable& msg) << " to " << position); // Verify that there are no messages after the new position in the queue. SequenceNumber next; - if (ReplicatingSubscription::getNext(*queue, position, next)) - throw Exception("Invalid position move, preceeds messages"); + if (getNext(*queue, position, next)) + throw Exception(QPID_MSG(logPrefix << "Invalid position " << position + << " preceeds message at " << next)); queue->setPosition(position); } // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); - haBroker.shutdown(); - throw; + haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what())); } } |