diff options
| author | Alan Conway <aconway@apache.org> | 2012-03-05 21:31:58 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-03-05 21:31:58 +0000 |
| commit | 47fb332cc91310afbb49af74b5d0f11b1efdaaa1 (patch) | |
| tree | 8570c558bd8db0cc428a9f5b72b78b4fad2f6ac9 /qpid/cpp | |
| parent | 15f7c4dd7936a34151b748a4ddbf7cdc2bdb87f0 (diff) | |
| download | qpid-python-47fb332cc91310afbb49af74b5d0f11b1efdaaa1.tar.gz | |
QPID-3603: Initial documentation for the new HA plug-in.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1297234 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/design_docs/new-ha-design.txt | 83 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 8 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 42 |
3 files changed, 44 insertions, 89 deletions
diff --git a/qpid/cpp/design_docs/new-ha-design.txt b/qpid/cpp/design_docs/new-ha-design.txt index 24e28122f8..acca1720b4 100644 --- a/qpid/cpp/design_docs/new-ha-design.txt +++ b/qpid/cpp/design_docs/new-ha-design.txt @@ -257,20 +257,24 @@ Broker startup with store: - When connecting as backup, check UUID matches primary, shut down if not. - Empty: start ok, no UUID check with primary. -** Current Limitations +* Current Limitations (In no particular order at present) For message replication: -LM1 - The re-synchronisation does not handle the case where a newly elected -primary is *behind* one of the other backups. To address this I propose -a new event for restting the sequence that the new primary would send -out on detecting that a replicating browser is ahead of it, requesting -that the replica revert back to a particular sequence number. The -replica on receiving this event would then discard (i.e. dequeue) all -the messages ahead of that sequence number and reset the counter to -correctly sequence any subsequently delivered messages. +LM1a - On failover, backups delete their queues and download the full queue state from the +primary. There was code to use messags already on the backup for re-synchronisation, it +was removed in early development (r1214490) to simplify the logic while getting basic +replication working. It needs to be re-introduced. + +LM1b - This re-synchronisation does not handle the case where a newly elected primary is *behind* +one of the other backups. To address this I propose a new event for restting the sequence +that the new primary would send out on detecting that a replicating browser is ahead of +it, requesting that the replica revert back to a particular sequence number. The replica +on receiving this event would then discard (i.e. dequeue) all the messages ahead of that +sequence number and reset the counter to correctly sequence any subsequently delivered +messages. LM2 - There is a need to handle wrap-around of the message sequence to avoid confusing the resynchronisation where a replica has been disconnected @@ -349,6 +353,12 @@ LC6 - The events and query responses are not fully synchronized. It is not possible to miss a create event and yet not to have the object in question in the query response however. +LC7 Federated links from the primary will be lost in failover, they will not be re-connected on +the new primary. Federation links to the primary can fail over. + +LC8 Only plain FIFO queues can be replicated. LVQs and ring queues are not yet supported. + +LC9 The "last man standing" feature of the old cluster is not available. * Benefits compared to previous cluster implementation. @@ -359,58 +369,3 @@ LC6 - The events and query responses are not fully synchronized. - Can take advantage of resource manager features, e.g. virtual IP addresses. - Fewer inconsistent errors (store failures) that can be handled without killing brokers. - Improved performance -* User Documentation Notes - -Notes to seed initial user documentation. Loosely tracking the implementation, -some points mentioned in the doc may not be implemented yet. - -** High Availability Overview - -HA is implemented using a 'hot standby' approach. Clients are directed -to a single "primary" broker. The primary executes client requests and -also replicates them to one or more "backup" brokers. If the primary -fails, one of the backups takes over the role of primary carrying on -from where the primary left off. Clients will fail over to the new -primary automatically and continue their work. - -TODO: at least once, deduplication. - -** Enabling replication on the client. - -To enable replication set the qpid.replicate argument when creating a -queue or exchange. - -This can have one of 3 values -- none: the object is not replicated -- configuration: queues, exchanges and bindings are replicated but messages are not. -- messages: configuration and messages are replicated. - -TODO: examples -TODO: more options for default value of qpid.replicate - -A HA client connection has multiple addresses, one for each broker. If -the it fails to connect to an address, or the connection breaks, -it will automatically fail-over to another address. - -Only the primary broker accepts connections, the backup brokers -redirect connection attempts to the primary. If the primary fails, one -of the backups is promoted to primary and clients fail-over to the new -primary. - -TODO: using multiple-address connections, examples c++, python, java. - -TODO: dynamic cluster addressing? - -TODO: need de-duplication. - -** Enabling replication on the broker. - -Network topology: backup links, separate client/broker networks. -Describe failover mechanisms. -- Client view: URLs, failover, exclusion & discovery. -- Broker view: similar. -Role of rmganager - -** Configuring rgmanager - -** Configuring qpidd diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 85b97e7e3e..609a3378ad 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -113,15 +113,15 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES }; +enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL }; const string S_NONE="none"; const string S_CONFIGURATION="configuration"; -const string S_MESSAGES="messages"; +const string S_ALL="all"; ReplicateLevel replicateLevel(const string& level) { if (level == S_NONE) return RL_NONE; if (level == S_CONFIGURATION) return RL_CONFIGURATION; - if (level == S_MESSAGES) return RL_MESSAGES; + if (level == S_ALL) return RL_ALL; throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); } @@ -491,7 +491,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { } void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - if (replicateLevel(queue->getSettings()) == RL_MESSAGES) { + if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); broker.getExchanges().registerExchange(qr); qr->activate(); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 0ab876ecab..822e07c702 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -58,11 +58,11 @@ class HaBroker(Broker): def config_replicate(self, from_broker, queue): assert os.system( - "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 def config_declare(self, queue, replication): assert os.system( - "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0 + "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 class HaCluster(object): _cluster_count = 0 @@ -98,7 +98,7 @@ class HaCluster(object): def __iter__(self): return self._brokers.__iter__() -def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value +def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value class ShortTests(BrokerTest): """Short HA functionality tests.""" @@ -148,18 +148,18 @@ class ShortTests(BrokerTest): return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) def setup(p, prefix, primary): """Create config, send messages on the primary p""" - s = p.sender(queue(prefix+"q1", "messages")) + s = p.sender(queue(prefix+"q1", "all")) for m in ["a", "b", "1"]: s.send(Message(m)) # Test replication of dequeue self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") p.acknowledge() p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) p.sender(queue(prefix+"q3", "none")).send(Message("3")) - p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4")) - p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5")) + p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) # Test unbind - p.sender(queue(prefix+"q4", "messages")).send(Message("6")) - s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4")) + p.sender(queue(prefix+"q4", "all")).send(Message("6")) + s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4")) s3.send(Message("7")) # Use old connection to unbind us = primary.connect_old().session(str(uuid4())) @@ -204,7 +204,7 @@ class ShortTests(BrokerTest): verify(b, "1", p) verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. - s = p.sender(queue("foo","messages")) + s = p.sender(queue("foo","all")) self.wait(b, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) @@ -232,7 +232,7 @@ class ShortTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() p = primary.connect().session() - s = p.sender(queue("q","messages")) + s = p.sender(queue("q","all")) for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) @@ -260,14 +260,14 @@ class ShortTests(BrokerTest): sender = self.popen( ["qpid-send", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(qr_node("messages")), + "--address", "q;{create:always,%s}"%(qr_node("all")), "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(qr_node("messages")), + "--address", "q;{create:always,%s}"%(qr_node("all")), "--messages=990", "--timeout=10" ]) @@ -352,7 +352,7 @@ class ShortTests(BrokerTest): def test_qpid_config_replication(self): """Set up replication via qpid-config""" brokers = HaCluster(self,2) - brokers[0].config_declare("q","messages") + brokers[0].config_declare("q","all") brokers[0].connect().session().sender("q").send("foo") self.assert_browse_backup(brokers[1], "q", ["foo"]) @@ -389,8 +389,8 @@ class ShortTests(BrokerTest): cluster = HaCluster(self, 2) primary = cluster[0] pc = cluster.connect(0) - ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages")) - pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages")) + ps = pc.session().sender("q;{create:always,%s}"%qr_node("all")) + pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all")) backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) br = backup.connect().session().receiver("q;{create:always}") backup.replicate(cluster.url, "q") @@ -410,7 +410,7 @@ class ShortTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':messages}}}}") + s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}") def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: send(*kv) @@ -426,7 +426,7 @@ class ShortTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':messages}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}") for i in range(10): s.send(Message(str(i))) self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) @@ -435,7 +435,7 @@ class ShortTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':messages}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}") try: for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass @@ -447,7 +447,7 @@ class ShortTests(BrokerTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) session = primary.connect().session() - s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':messages}}}}") + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) # Can't use browse_backup as browser sees messages in delivery order not priority. @@ -466,7 +466,7 @@ class ShortTests(BrokerTest): priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2} limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()]) - s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':messages}}}}"%(levels,limit_policy)) + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy)) messages = [Message(content=str(uuid4()), priority = p) for p in priorities] for m in messages: s.send(m) self.wait_backup(backup, s.target) @@ -480,7 +480,7 @@ class ShortTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':messages}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low |
