diff options
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 43 |
1 files changed, 23 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 456eba7f9d..a8e7b3c368 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -31,10 +31,12 @@ using qpid::framing::Uuid; namespace qpid { namespace broker { -Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) : - args(_args), channel(id, &(c.getOutput())), peer(channel), - mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)), - connection(c), listener(l), name(Uuid(true).str()) +Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l, + const management::ArgsLinkBridge& _args) : + id(_id), args(_args), + mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest, + args.i_key, args.i_src_is_queue, args.i_src_is_local)), + listener(l), name(Uuid(true).str()) { management::ManagementAgent::getAgent()->addObject(mgmtObject); } @@ -44,18 +46,21 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create() +void Bridge::create(ConnectionState& c) { - framing::AMQP_ServerProxy::Session session(channel); - session.attach(name, false); + channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); + peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); + + session->attach(name, false); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { if (args.i_src_is_queue) { - peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { string queue = "bridge_queue_"; queue += Uuid(true).str(); @@ -66,22 +71,22 @@ void Bridge::create() if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } + bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues? bool autoDelete = !durable;//auto delete transient queues? - peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); - peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } } - } void Bridge::cancel() { - peer.getMessage().cancel(args.i_dest); - peer.getSession().detach(name); + peer->getMessage().cancel(args.i_dest); + peer->getSession().detach(name); } management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const @@ -94,8 +99,6 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man if (methodId == management::Bridge::METHOD_CLOSE) { //notify that we are closed listener(this); - //request time on the connections io thread - connection.getOutput().activateOutput(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; |
