summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp43
1 files changed, 23 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 456eba7f9d..a8e7b3c368 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -31,10 +31,12 @@ using qpid::framing::Uuid;
namespace qpid {
namespace broker {
-Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const management::ArgsLinkBridge& _args) :
- args(_args), channel(id, &(c.getOutput())), peer(channel),
- mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key, args.i_src_is_queue, args.i_src_is_local)),
- connection(c), listener(l), name(Uuid(true).str())
+Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+ const management::ArgsLinkBridge& _args) :
+ id(_id), args(_args),
+ mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
+ args.i_key, args.i_src_is_queue, args.i_src_is_local)),
+ listener(l), name(Uuid(true).str())
{
management::ManagementAgent::getAgent()->addObject(mgmtObject);
}
@@ -44,18 +46,21 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create()
+void Bridge::create(ConnectionState& c)
{
- framing::AMQP_ServerProxy::Session session(channel);
- session.attach(name, false);
+ channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
@@ -66,22 +71,22 @@ void Bridge::create()
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
}
+
bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer.getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
- peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
-
}
void Bridge::cancel()
{
- peer.getMessage().cancel(args.i_dest);
- peer.getSession().detach(name);
+ peer->getMessage().cancel(args.i_dest);
+ peer->getSession().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
@@ -94,8 +99,6 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, man
if (methodId == management::Bridge::METHOD_CLOSE) {
//notify that we are closed
listener(this);
- //request time on the connections io thread
- connection.getOutput().activateOutput();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;