diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 4 |
4 files changed, 12 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 91a4bf242b..6a17555b6f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -299,10 +299,12 @@ void BrokerReplicator::route(Deliverable& msg) { } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() << ": while handling: " << list); + haBroker.shutdown(); throw; } } + void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { Variant::Map argsMap = asMapVoid(values[ARGS]); bool autoDel = values[AUTODEL].asBool(); @@ -542,7 +544,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu { if (replicationTest.replicateLevel(queue->getSettings()) == ALL) { boost::shared_ptr<QueueReplicator> qr( - new QueueReplicator(haBroker.getBrokerInfo(), queue, link)); + new QueueReplicator(haBroker, queue, link)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index fc74ce633a..d126639813 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -193,7 +193,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, link->setUrl(url); // Create a queue replicator boost::shared_ptr<QueueReplicator> qr( - new QueueReplicator(brokerInfo, queue, link)); + new QueueReplicator(*this, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); break; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index a55fa00562..be910a087f 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -19,6 +19,7 @@ * */ +#include "HaBroker.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" @@ -58,12 +59,13 @@ bool QueueReplicator::isEventKey(const std::string key) { return ret; } -QueueReplicator::QueueReplicator(const BrokerInfo& info, +QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), + haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), - queue(q), link(l), brokerInfo(info) + queue(q), link(l), brokerInfo(hb.getBrokerInfo()) { Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -183,6 +185,7 @@ void QueueReplicator::route(Deliverable& msg) } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); + haBroker.shutdown(); throw; } } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 6754174076..f8a68ea38f 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -40,6 +40,7 @@ class Deliverable; } namespace ha { +class HaBroker; /** * Exchange created on a backup broker to replicate a queue on the primary. @@ -60,7 +61,7 @@ class QueueReplicator : public broker::Exchange, /** Test if a string is an event key */ static bool isEventKey(const std::string key); - QueueReplicator(const BrokerInfo&, + QueueReplicator(HaBroker&, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); @@ -80,6 +81,7 @@ class QueueReplicator : public broker::Exchange, void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); + HaBroker& haBroker; std::string logPrefix; std::string bridgeName; sys::Mutex lock; |
