summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h12
5 files changed, 41 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 9d374e7a4f..beeaa318cc 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -312,9 +312,14 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
//subscribe to the queue
- peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ FieldTable arguments;
+ arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ peer.getMessage().subscribe(
+ queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(args.i_dest, 1); // Window
+ peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages());
+ peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes());
// Issue a query request for queues, exchanges, bindings and the habroker
// using event queue as the reply-to address
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index f7fe553d9b..9a06a227a5 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -48,6 +48,10 @@ struct Options : public qpid::Options {
"Authentication mechanism for connections between HA brokers")
("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
"Maximum time to wait for an expected backup to connect and become ready.")
+ ("ha-flow-messages", optValue(settings.flowMessages, "N"),
+ "Flow control message count limit for replication, 0 means no limit")
+ ("ha-flow-bytes", optValue(settings.flowBytes, "N"),
+ "Flow control byte limit for replication, 0 means no limit")
;
}
};
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 6d30a5c10c..ea76763425 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -22,6 +22,7 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -39,7 +40,6 @@
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
}
namespace qpid {
@@ -52,6 +52,7 @@ using sys::Mutex;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
@@ -107,7 +108,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false)
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+ settings(hb.getSettings())
{
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
@@ -165,23 +167,21 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
if (!queue) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
- FieldTable settings;
- settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- settings.setInt(ReplicatingSubscription::QPID_BACK,
- queue->getPosition());
- settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
- brokerInfo.asFieldTable());
+ FieldTable arguments;
+ arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
+ arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
SequenceNumber front, back;
queue->getRange(front, back, broker::REPLICATOR);
- if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front);
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
- false/*exclusive*/, "", 0, settings);
- // FIXME aconway 2012-05-22: use a finite credit window?
- peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
- peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(getName(), 1); // Window
+ peer.getMessage().flow(getName(), 0, settings.getFlowMessages());
+ peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
}
catch(const exception& e) {
QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what()));
@@ -190,7 +190,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
qpid::Address primary;
link->getRemoteAddress(primary);
QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
- QPID_LOG(trace, logPrefix << "Subscription settings: " << settings);
+ QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 5fdc022cb1..757605a23a 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -41,6 +41,7 @@ class Deliverable;
namespace ha {
class HaBroker;
+class Settings;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -57,6 +58,8 @@ class QueueReplicator : public broker::Exchange,
public:
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
+ static const std::string QPID_SYNC_FREQUENCY;
+
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
@@ -101,6 +104,7 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Bridge> bridge;
BrokerInfo brokerInfo;
bool subscribed;
+ const Settings& settings;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 37235b5c79..1be068063a 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "qpid/sys/IntegerTypes.h"
#include <string>
namespace qpid {
@@ -34,7 +35,8 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5)
+ Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5),
+ flowMessages(100), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
@@ -43,7 +45,13 @@ class Settings
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
double backupTimeout;
- private:
+
+ uint32_t flowMessages, flowBytes;
+
+ static const uint32_t NO_LIMIT=0xFFFFFFFF;
+ static uint32_t flowValue(uint32_t n) { return n ? n : NO_LIMIT; }
+ uint32_t getFlowMessages() const { return flowValue(flowMessages); }
+ uint32_t getFlowBytes() const { return flowValue(flowBytes); }
};
}} // namespace qpid::ha