diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 30 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 12 |
5 files changed, 41 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 9d374e7a4f..beeaa318cc 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -312,9 +312,14 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable()); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable()); //subscribe to the queue - peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + FieldTable arguments; + arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + peer.getMessage().subscribe( + queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/, + false/*exclusive*/, "", 0, arguments); + peer.getMessage().setFlowMode(args.i_dest, 1); // Window + peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages()); + peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes()); // Issue a query request for queues, exchanges, bindings and the habroker // using event queue as the reply-to address diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index f7fe553d9b..9a06a227a5 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -48,6 +48,10 @@ struct Options : public qpid::Options { "Authentication mechanism for connections between HA brokers") ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"), "Maximum time to wait for an expected backup to connect and become ready.") + ("ha-flow-messages", optValue(settings.flowMessages, "N"), + "Flow control message count limit for replication, 0 means no limit") + ("ha-flow-bytes", optValue(settings.flowBytes, "N"), + "Flow control byte limit for replication, 0 means no limit") ; } }; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 6d30a5c10c..ea76763425 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -22,6 +22,7 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -39,7 +40,6 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); const std::string TYPE_NAME("qpid.queue-replicator"); -const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); } namespace qpid { @@ -52,6 +52,7 @@ using sys::Mutex; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); +const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; @@ -107,7 +108,8 @@ QueueReplicator::QueueReplicator(HaBroker& hb, : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false) + queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), + settings(hb.getSettings()) { args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); @@ -165,23 +167,21 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa if (!queue) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - FieldTable settings; - settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - settings.setInt(ReplicatingSubscription::QPID_BACK, - queue->getPosition()); - settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, - brokerInfo.asFieldTable()); + FieldTable arguments; + arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition()); + arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable()); SequenceNumber front, back; queue->getRange(front, back, broker::REPLICATOR); - if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front); + if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front); try { peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, - false/*exclusive*/, "", 0, settings); - // FIXME aconway 2012-05-22: use a finite credit window? - peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); - peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + false/*exclusive*/, "", 0, arguments); + peer.getMessage().setFlowMode(getName(), 1); // Window + peer.getMessage().flow(getName(), 0, settings.getFlowMessages()); + peer.getMessage().flow(getName(), 1, settings.getFlowBytes()); } catch(const exception& e) { QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what())); @@ -190,7 +190,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa qpid::Address primary; link->getRemoteAddress(primary); QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); - QPID_LOG(trace, logPrefix << "Subscription settings: " << settings); + QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments); } namespace { diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 5fdc022cb1..757605a23a 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -41,6 +41,7 @@ class Deliverable; namespace ha { class HaBroker; +class Settings; /** * Exchange created on a backup broker to replicate a queue on the primary. @@ -57,6 +58,8 @@ class QueueReplicator : public broker::Exchange, public: static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; + static const std::string QPID_SYNC_FREQUENCY; + static std::string replicatorName(const std::string& queueName); static bool isReplicatorName(const std::string&); @@ -101,6 +104,7 @@ class QueueReplicator : public broker::Exchange, boost::shared_ptr<broker::Bridge> bridge; BrokerInfo brokerInfo; bool subscribed; + const Settings& settings; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 37235b5c79..1be068063a 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -23,6 +23,7 @@ */ #include "types.h" +#include "qpid/sys/IntegerTypes.h" #include <string> namespace qpid { @@ -34,7 +35,8 @@ namespace ha { class Settings { public: - Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5) + Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5), + flowMessages(100), flowBytes(0) {} bool cluster; // True if we are a cluster member. @@ -43,7 +45,13 @@ class Settings Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; double backupTimeout; - private: + + uint32_t flowMessages, flowBytes; + + static const uint32_t NO_LIMIT=0xFFFFFFFF; + static uint32_t flowValue(uint32_t n) { return n ? n : NO_LIMIT; } + uint32_t getFlowMessages() const { return flowValue(flowMessages); } + uint32_t getFlowBytes() const { return flowValue(flowBytes); } }; }} // namespace qpid::ha |
