summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp13
1 files changed, 9 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index f9cb7ccd3c..6129f13ede 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -68,7 +68,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
mgmtObject = new _qmf::Bridge
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes, args.i_dynamic);
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
if (!args.i_durable)
agent->addObject(mgmtObject);
}
@@ -81,6 +81,8 @@ Bridge::~Bridge()
void Bridge::create(ConnectionState& c)
{
+ FieldTable options;
+ if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
connState = &c;
if (args.i_srcIsLocal) {
if (args.i_dynamic)
@@ -103,7 +105,7 @@ void Bridge::create(ConnectionState& c)
session->commandPoint(0,0);
if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
@@ -194,9 +196,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(id);
buffer.getShortString(excludes);
bool dynamic(buffer.getOctet());
+ uint16_t sync = buffer.getShort();
return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic).first;
+ is_queue, is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -213,6 +216,7 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
buffer.putOctet(args.i_dynamic ? 1 : 0);
+ buffer.putShort(args.i_sync);
}
uint32_t Bridge::encodedSize() const
@@ -228,7 +232,8 @@ uint32_t Bridge::encodedSize() const
+ 1 // srcIsLocal
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1
- + 1; // dynamic
+ + 1 // dynamic
+ + 2; // sync
}
management::ManagementObject* Bridge::GetManagementObject (void) const