summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
committerGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
commitafefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch)
tree70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/broker/Bridge.cpp
parent1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff)
downloadqpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates * added support for acknowledged transfer over inter-broker bridges * added option to qpid-route to control this git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp13
1 files changed, 9 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index f9cb7ccd3c..6129f13ede 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -68,7 +68,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
mgmtObject = new _qmf::Bridge
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes, args.i_dynamic);
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
if (!args.i_durable)
agent->addObject(mgmtObject);
}
@@ -81,6 +81,8 @@ Bridge::~Bridge()
void Bridge::create(ConnectionState& c)
{
+ FieldTable options;
+ if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
connState = &c;
if (args.i_srcIsLocal) {
if (args.i_dynamic)
@@ -103,7 +105,7 @@ void Bridge::create(ConnectionState& c)
session->commandPoint(0,0);
if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
@@ -194,9 +196,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(id);
buffer.getShortString(excludes);
bool dynamic(buffer.getOctet());
+ uint16_t sync = buffer.getShort();
return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic).first;
+ is_queue, is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -213,6 +216,7 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
buffer.putOctet(args.i_dynamic ? 1 : 0);
+ buffer.putShort(args.i_sync);
}
uint32_t Bridge::encodedSize() const
@@ -228,7 +232,8 @@ uint32_t Bridge::encodedSize() const
+ 1 // srcIsLocal
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1
- + 1; // dynamic
+ + 1 // dynamic
+ + 2; // sync
}
management::ManagementObject* Bridge::GetManagementObject (void) const