summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-03-05 21:31:58 +0000
committerAlan Conway <aconway@apache.org>2012-03-05 21:31:58 +0000
commit47fb332cc91310afbb49af74b5d0f11b1efdaaa1 (patch)
tree8570c558bd8db0cc428a9f5b72b78b4fad2f6ac9 /qpid/cpp
parent15f7c4dd7936a34151b748a4ddbf7cdc2bdb87f0 (diff)
downloadqpid-python-47fb332cc91310afbb49af74b5d0f11b1efdaaa1.tar.gz
QPID-3603: Initial documentation for the new HA plug-in.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297234 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/design_docs/new-ha-design.txt83
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp8
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py42
3 files changed, 44 insertions, 89 deletions
diff --git a/qpid/cpp/design_docs/new-ha-design.txt b/qpid/cpp/design_docs/new-ha-design.txt
index 24e28122f8..acca1720b4 100644
--- a/qpid/cpp/design_docs/new-ha-design.txt
+++ b/qpid/cpp/design_docs/new-ha-design.txt
@@ -257,20 +257,24 @@ Broker startup with store:
- When connecting as backup, check UUID matches primary, shut down if not.
- Empty: start ok, no UUID check with primary.
-** Current Limitations
+* Current Limitations
(In no particular order at present)
For message replication:
-LM1 - The re-synchronisation does not handle the case where a newly elected
-primary is *behind* one of the other backups. To address this I propose
-a new event for restting the sequence that the new primary would send
-out on detecting that a replicating browser is ahead of it, requesting
-that the replica revert back to a particular sequence number. The
-replica on receiving this event would then discard (i.e. dequeue) all
-the messages ahead of that sequence number and reset the counter to
-correctly sequence any subsequently delivered messages.
+LM1a - On failover, backups delete their queues and download the full queue state from the
+primary. There was code to use messags already on the backup for re-synchronisation, it
+was removed in early development (r1214490) to simplify the logic while getting basic
+replication working. It needs to be re-introduced.
+
+LM1b - This re-synchronisation does not handle the case where a newly elected primary is *behind*
+one of the other backups. To address this I propose a new event for restting the sequence
+that the new primary would send out on detecting that a replicating browser is ahead of
+it, requesting that the replica revert back to a particular sequence number. The replica
+on receiving this event would then discard (i.e. dequeue) all the messages ahead of that
+sequence number and reset the counter to correctly sequence any subsequently delivered
+messages.
LM2 - There is a need to handle wrap-around of the message sequence to avoid
confusing the resynchronisation where a replica has been disconnected
@@ -349,6 +353,12 @@ LC6 - The events and query responses are not fully synchronized.
It is not possible to miss a create event and yet not to have
the object in question in the query response however.
+LC7 Federated links from the primary will be lost in failover, they will not be re-connected on
+the new primary. Federation links to the primary can fail over.
+
+LC8 Only plain FIFO queues can be replicated. LVQs and ring queues are not yet supported.
+
+LC9 The "last man standing" feature of the old cluster is not available.
* Benefits compared to previous cluster implementation.
@@ -359,58 +369,3 @@ LC6 - The events and query responses are not fully synchronized.
- Can take advantage of resource manager features, e.g. virtual IP addresses.
- Fewer inconsistent errors (store failures) that can be handled without killing brokers.
- Improved performance
-* User Documentation Notes
-
-Notes to seed initial user documentation. Loosely tracking the implementation,
-some points mentioned in the doc may not be implemented yet.
-
-** High Availability Overview
-
-HA is implemented using a 'hot standby' approach. Clients are directed
-to a single "primary" broker. The primary executes client requests and
-also replicates them to one or more "backup" brokers. If the primary
-fails, one of the backups takes over the role of primary carrying on
-from where the primary left off. Clients will fail over to the new
-primary automatically and continue their work.
-
-TODO: at least once, deduplication.
-
-** Enabling replication on the client.
-
-To enable replication set the qpid.replicate argument when creating a
-queue or exchange.
-
-This can have one of 3 values
-- none: the object is not replicated
-- configuration: queues, exchanges and bindings are replicated but messages are not.
-- messages: configuration and messages are replicated.
-
-TODO: examples
-TODO: more options for default value of qpid.replicate
-
-A HA client connection has multiple addresses, one for each broker. If
-the it fails to connect to an address, or the connection breaks,
-it will automatically fail-over to another address.
-
-Only the primary broker accepts connections, the backup brokers
-redirect connection attempts to the primary. If the primary fails, one
-of the backups is promoted to primary and clients fail-over to the new
-primary.
-
-TODO: using multiple-address connections, examples c++, python, java.
-
-TODO: dynamic cluster addressing?
-
-TODO: need de-duplication.
-
-** Enabling replication on the broker.
-
-Network topology: backup links, separate client/broker networks.
-Describe failover mechanisms.
-- Client view: URLs, failover, exclusion & discovery.
-- Broker view: similar.
-Role of rmganager
-
-** Configuring rgmanager
-
-** Configuring qpidd
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 85b97e7e3e..609a3378ad 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -113,15 +113,15 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
+enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL };
const string S_NONE="none";
const string S_CONFIGURATION="configuration";
-const string S_MESSAGES="messages";
+const string S_ALL="all";
ReplicateLevel replicateLevel(const string& level) {
if (level == S_NONE) return RL_NONE;
if (level == S_CONFIGURATION) return RL_CONFIGURATION;
- if (level == S_MESSAGES) return RL_MESSAGES;
+ if (level == S_ALL) return RL_ALL;
throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
}
@@ -491,7 +491,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
}
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
- if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
+ if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
broker.getExchanges().registerExchange(qr);
qr->activate();
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 0ab876ecab..822e07c702 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -58,11 +58,11 @@ class HaBroker(Broker):
def config_replicate(self, from_broker, queue):
assert os.system(
- "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
def config_declare(self, queue, replication):
assert os.system(
- "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
class HaCluster(object):
_cluster_count = 0
@@ -98,7 +98,7 @@ class HaCluster(object):
def __iter__(self): return self._brokers.__iter__()
-def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
+def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
@@ -148,18 +148,18 @@ class ShortTests(BrokerTest):
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
def setup(p, prefix, primary):
"""Create config, send messages on the primary p"""
- s = p.sender(queue(prefix+"q1", "messages"))
+ s = p.sender(queue(prefix+"q1", "all"))
for m in ["a", "b", "1"]: s.send(Message(m))
# Test replication of dequeue
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
p.acknowledge()
p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
- p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4"))
- p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
# Test unbind
- p.sender(queue(prefix+"q4", "messages")).send(Message("6"))
- s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4"))
+ p.sender(queue(prefix+"q4", "all")).send(Message("6"))
+ s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
s3.send(Message("7"))
# Use old connection to unbind
us = primary.connect_old().session(str(uuid4()))
@@ -204,7 +204,7 @@ class ShortTests(BrokerTest):
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
- s = p.sender(queue("foo","messages"))
+ s = p.sender(queue("foo","all"))
self.wait(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
@@ -232,7 +232,7 @@ class ShortTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
- s = p.sender(queue("q","messages"))
+ s = p.sender(queue("q","all"))
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -260,14 +260,14 @@ class ShortTests(BrokerTest):
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("all")),
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("all")),
"--messages=990",
"--timeout=10"
])
@@ -352,7 +352,7 @@ class ShortTests(BrokerTest):
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
- brokers[0].config_declare("q","messages")
+ brokers[0].config_declare("q","all")
brokers[0].connect().session().sender("q").send("foo")
self.assert_browse_backup(brokers[1], "q", ["foo"])
@@ -389,8 +389,8 @@ class ShortTests(BrokerTest):
cluster = HaCluster(self, 2)
primary = cluster[0]
pc = cluster.connect(0)
- ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages"))
- pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages"))
+ ps = pc.session().sender("q;{create:always,%s}"%qr_node("all"))
+ pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all"))
backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
@@ -410,7 +410,7 @@ class ShortTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':messages}}}}")
+ s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}")
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
send(*kv)
@@ -426,7 +426,7 @@ class ShortTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':messages}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
for i in range(10): s.send(Message(str(i)))
self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
@@ -435,7 +435,7 @@ class ShortTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':messages}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
@@ -447,7 +447,7 @@ class ShortTests(BrokerTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
session = primary.connect().session()
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':messages}}}}")
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
@@ -466,7 +466,7 @@ class ShortTests(BrokerTest):
priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':messages}}}}"%(levels,limit_policy))
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy))
messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
self.wait_backup(backup, s.target)
@@ -480,7 +480,7 @@ class ShortTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':messages}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low