diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
| commit | afefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch) | |
| tree | 70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/broker/Bridge.cpp | |
| parent | 1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff) | |
| download | qpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz | |
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index f9cb7ccd3c..6129f13ede 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -68,7 +68,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, mgmtObject = new _qmf::Bridge (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); if (!args.i_durable) agent->addObject(mgmtObject); } @@ -81,6 +81,8 @@ Bridge::~Bridge() void Bridge::create(ConnectionState& c) { + FieldTable options; + if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); connState = &c; if (args.i_srcIsLocal) { if (args.i_dynamic) @@ -103,7 +105,7 @@ void Bridge::create(ConnectionState& c) session->commandPoint(0,0); if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { @@ -194,9 +196,10 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(id); buffer.getShortString(excludes); bool dynamic(buffer.getOctet()); + uint16_t sync = buffer.getShort(); return links.declare(host, port, durable, src, dest, key, - is_queue, is_local, id, excludes, dynamic).first; + is_queue, is_local, id, excludes, dynamic, sync).first; } void Bridge::encode(Buffer& buffer) const @@ -213,6 +216,7 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); buffer.putOctet(args.i_dynamic ? 1 : 0); + buffer.putShort(args.i_sync); } uint32_t Bridge::encodedSize() const @@ -228,7 +232,8 @@ uint32_t Bridge::encodedSize() const + 1 // srcIsLocal + args.i_tag.size() + 1 + args.i_excludes.size() + 1 - + 1; // dynamic + + 1 // dynamic + + 2; // sync } management::ManagementObject* Bridge::GetManagementObject (void) const |
