diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Bridge.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 65 |
1 files changed, 37 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 4604ac643f..75aba70ae8 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -19,6 +19,8 @@ * */ #include "qpid/broker/Bridge.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/FedOps.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/Connection.h" @@ -49,6 +51,11 @@ using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; +namespace { +const std::string QPID_REPLICATE("qpid.replicate"); +const std::string NONE("none"); +} + namespace qpid { namespace broker { @@ -60,7 +67,7 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init, const std::string& _queueName, const string& ae) : - link(_link), channel(_id), args(_args), mgmtObject(0), + link(_link), channel(_id), args(_args), listener(l), name(_name), queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() : _queueName), @@ -71,10 +78,10 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, { ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Bridge + mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync)); mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } @@ -121,28 +128,30 @@ void Bridge::create(Connection& c, AsyncStore* const store) peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); } else { - FieldTable queueSettings; - - if (args.i_tag.size()) { - queueSettings.setString("qpid.trace.id", args.i_tag); - } else { - const string& peerTag = c.getFederationPeerTag(); - if (peerTag.size()) - queueSettings.setString("qpid.trace.id", peerTag); + if (!useExistingQueue) { + FieldTable queueSettings; + + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); + } else { + const string& peerTag = c.getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.id", peerTag); + } + + if (args.i_excludes.size()) { + queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } else { + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.exclude", localTag); + } + + bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? + bool exclusive = true; // only exclusive if the queue is owned by the bridge + bool autoDelete = exclusive && !durable;//auto delete transient queues? + peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); } - - if (args.i_excludes.size()) { - queueSettings.setString("qpid.trace.exclude", args.i_excludes); - } else { - const string& localTag = link->getBroker()->getFederationTag(); - if (localTag.size()) - queueSettings.setString("qpid.trace.exclude", localTag); - } - - bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? - bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge - bool autoDelete = exclusive && !durable;//auto delete transient queues? - peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); if (!args.i_dynamic) peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options); @@ -296,9 +305,9 @@ uint32_t Bridge::encodedSize() const + 2; // sync } -management::ManagementObject* Bridge::GetManagementObject (void) const +management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const { - return (management::ManagementObject*) mgmtObject; + return mgmtObject; } management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, @@ -331,6 +340,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList, } string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + bindArgs.setString(QPID_REPLICATE, NONE); bindArgs.setString(qpidFedOp, op); bindArgs.setString(qpidFedTags, newTagList); if (origin.empty()) @@ -366,8 +376,7 @@ void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchang if (resetProxy()) { peer->getExchange().bind(queue, exchange, key, args); } else { - QPID_LOG(error, "Cannot propagate binding for dynamic bridge as session has been detached, deleting dynamic bridge"); - close(); + // link's periodic maintenance visit will attempt to recover } } |