diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/design_docs | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/design_docs')
-rw-r--r-- | cpp/design_docs/hot-standby-design.txt | 239 | ||||
-rw-r--r-- | cpp/design_docs/new-cluster-design.txt | 302 | ||||
-rw-r--r-- | cpp/design_docs/new-cluster-plan.txt | 563 |
3 files changed, 528 insertions, 576 deletions
diff --git a/cpp/design_docs/hot-standby-design.txt b/cpp/design_docs/hot-standby-design.txt new file mode 100644 index 0000000000..99a5dc0199 --- /dev/null +++ b/cpp/design_docs/hot-standby-design.txt @@ -0,0 +1,239 @@ +-*-org-*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +* Another new design for Qpid clustering. + +For background see [[./new-cluster-design.txt]] which describes the issues +with the old design and a new active-active design that could replace it. + +This document describes an alternative hot-standby approach. + +** Delivery guarantee + +We guarantee N-way redundant, at least once delivey. Once a message +from a client has been acknowledged by the broker, it will be +delivered even if N-1 brokers subsequently fail. There may be +duplicates in the event of a failure. We don't make duplicates +during normal operation (i.e when no brokers have failed) + +This is the same guarantee as the old cluster and the alternative +active-active design. + +** Active-active vs. hot standby (aka primary-backup) + +An active-active cluster allows clients to connect to any broker in +the cluster. If a broker fails, clients can fail-over to any other +live broker. + +A hot-standby cluster has only one active broker at a time (the +"primary") and one or more brokers on standby (the "backups"). Clients +are only served by the leader, clients that connect to a backup are +redirected to the leader. The backpus are kept up-to-date in real time +by the primary, if the primary fails a backup is elected to be the new +primary. + +Aside: A cold-standby cluster is possible using a standalone broker, +CMAN and shared storage. In this scenario only one broker runs at a +time writing to a shared store. If it fails, another broker is started +(by CMAN) and recovers from the store. This bears investigation but +the store recovery time is probably too long for failover. + +** Why hot standby? + +Active-active has some advantages: +- Finding a broker on startup or failover is simple, just pick any live broker. +- All brokers are always running in active mode, there's no +- Distributing clients across brokers gives better performance, but see [1]. +- A broker failure affects only clients connected to that broker. + +The main problem with active-active is co-ordinating consumers of the +same queue on multiple brokers such that there are no duplicates in +normal operation. There are 2 approaches: + +Predictive: each broker predicts which messages others will take. This +the main weakness of the old design so not appealing. + +Locking: brokers "lock" a queue in order to take messages. This is +complex to implement, its not straighforward to determine the most +performant strategie for passing the lock. + +Hot-standby removes this problem. Only the primary can modify queues +so it just has to tell the backups what it is doing, there's no +locking. + +The primary can enqueue messages and replicate asynchronously - +exactly like the store does, but it "writes" to the replicas over the +network rather than writing to disk. + +** Failover in a hot-standby cluster. + +Hot-standby has some potential performance issues around failover: + +- Failover "spike": when the primary fails every client will fail over + at the same time, putting strain on the system. + +- Until a new primary is elected, cluster cannot serve any clients or + redirect clients to the primary. + +We want to minimize the number of re-connect attempts that clients +have to make. The cluster can use a well-known algorithm to choose the +new primary (e.g. round robin on a known sequence of brokers) so that +clients can guess the new primary correctly in most cases. + +Even if clients do guess correctly it may be that the new primary is +not yet aware of the death of the old primary, which is may to cause +multiple failed connect attempts before clients eventually get +connected. We will need to prototype to see how much this happens in +reality and how we can best get clients redirected. + +** Threading and performance. + +The primary-backup cluster operates analogously to the way the disk store does now: +- use the same MessageStore interface as the store to interact with the broker +- use the same asynchronous-completion model for replicating messages. +- use the same recovery interfaces (?) for new backups joining. + +Re-using the well-established store design gives credibility to the new cluster design. + +The single CPG dispatch thread was a severe performance bottleneck for the old cluster. + +The primary has the same threading model as a a standalone broker with +a store, which we know that this performs well. + +If we use CPG for replication of messages, the backups will receive +messages in the CPG dispatch thread. To get more concurency, the CPG +thread can dump work onto internal PollableQueues to be processed in +parallel. + +Messages from the same broker queue need to go onto the same +PollableQueue. There could be a separate PollableQueue for each broker +queue. If that's too resource intensive we can use a fixed set of +PollableQueues and assign broker queues to PollableQueues via hashing +or round robin. + +Another possible optimization is to use multiple CPG queues: one per +queue or a hashed set, to get more concurrency in the CPG layer. The +old cluster is not able to keep CPG busy. + +TODO: Transactions pose a challenge with these concurrent models: how +to co-ordinate multiple messages being added (commit a publish or roll +back an accept) to multiple queues so that all replicas end up with +the same message sequence while respecting atomicity. + +** Use of CPG + +CPG provides several benefits in the old cluster: +- tracking membership (essential for determining the primary) +- handling "spit brain" (integrates with partition support from CMAN) +- reliable multicast protocol to distribute messages. + +I believe we still need CPG for membership and split brain. We could +experiment with sending the bulk traffic over AMQP conections. + +** Flow control + +Need to ensure that +1) In-memory internal queues used by the cluster don't overflow. +2) The backups don't fall too far behind on processing CPG messages + +** Recovery +When a new backup joins an active cluster it must get a snapshot +from one of the other backups, or the primary if there are none. In +store terms this is "recovery" (old cluster called it an "update) + +Compared to old cluster we only replidate well defined data set of the store. +This is the crucial sore spot of old cluster. + +We can also replicated it more efficiently by recovering queues in +reverse (LIFO) order. That means as clients actively consume messages +from the front of the queue, they are redeucing the work we have to do +in recovering from the back. (NOTE: this may not be compatible with +using the same recovery interfaces as the store.) + +** Selective replication +In this model it's easy to support selective replication of individual queues via +configuration. +- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. + Treated analogously to persistent/durable properties for the store. +- if not explicitly marked, provide a choice of default + - default is replicate (replicated message on replicated queue) + - default is don't replicate + - default is replicate persistent/durable messages. + +** Inconsistent errors + +The new design eliminates most sources of inconsistent errors in the +old design (connections, sessions, security, management etc.) and +eliminates the need to stall the whole cluster till an error is +resolved. We still have to handle inconsistent store errors when store +and cluster are used together. + +We also have to include error handling in the async completion loop to +guarantee N-way at least once: we should only report success to the +client when we know the message was replicated and stored on all N-1 +backups. + +TODO: We have a lot more options than the old cluster, need to figure +out the best approach, or possibly allow mutliple approaches. Need to +go thru the various failure cases. We may be able to do recovery on a +per-queue basis rather than restarting an entire node. + +** New members joining + +We should be able to catch up much faster than the the old design. A +new backup can catch up ("recover") the current cluster state on a +per-queue basis. +- queues can be updated in parallel +- "live" updates avoid the the "endless chase" + +During a "live" update several things are happening on a queue: +- clients are publishing messages to the back of the queue, replicated to the backup +- clients are consuming messages from the front of the queue, replicated to the backup. +- the primary is sending pre-existing messages to the new backup. + +The primary sends pre-existing messages in LIFO order - starting from +the back of the queue, at the same time clients are consuming from the front. +The active consumers actually reduce the amount of work to be done, as there's +no need to replicate messages that are no longer on the queue. + +* Steps to get there + +** Baseline replication +Validate the overall design get initial notion of performance. Just +message+wiring replication, no update/recovery for new members joining, +single CPG dispatch thread on backups, no failover, no transactions. + +** Failover +Electing primary, backups redirect to primary. Measure failover time +for large # clients. Strategies to minimise number of retries after a +failure. + +** Flow Control +Keep internal queues from over-flowing. Similar to internal flow control in old cluster. +Needed for realistic performance/stress tests + +** Concurrency +Experiment with multiple threads on backups, multiple CPG groups. + +** Recovery/new member joining +Initial status handshake for new member. Recovering queues from the back. + +** Transactions +TODO: How to implement transactions with concurrency. Worst solution: +a global --cluster-use-transactions flag that forces single thread +mode. Need to find a better solution. diff --git a/cpp/design_docs/new-cluster-design.txt b/cpp/design_docs/new-cluster-design.txt index 7adb46fee3..936530a39a 100644 --- a/cpp/design_docs/new-cluster-design.txt +++ b/cpp/design_docs/new-cluster-design.txt @@ -17,7 +17,6 @@ # under the License. * A new design for Qpid clustering. - ** Issues with current design. The cluster is based on virtual synchrony: each broker multicasts @@ -84,19 +83,21 @@ context. ** A new cluster design. -Clearly defined interface between broker code and cluster plug-in. +1. Clearly defined interface between broker code and cluster plug-in. -Replicate queue events rather than client data. -- Broker behavior only needs to match per-queue. -- Smaller amount of code (queue implementation) that must behave predictably. -- Events only need be serialized per-queue, allows concurrency between queues +2. Replicate queue events rather than client data. + - Only requires consistent enqueue order. + - Events only need be serialized per-queue, allows concurrency between queues + - Allows for replicated and non-replicated queues. -Use a moving queue ownership protocol to agree order of dequeues. -No longer relies on identical state and lock-step behavior to cause -identical dequeues on each broker. +3. Use a lock protocol to agree order of dequeues: only the broker + holding the lock can acqiure & dequeue. No longer relies on + identical state and lock-step behavior to cause identical dequeues + on each broker. -Each queue has an associated thread-context. Events for a queue are executed -in that queues context, in parallel with events for other queues. +4. Use multiple CPG groups to process different queues in + parallel. Use a fixed set of groups and hash queue names to choose + the group for each queue. *** Requirements @@ -149,7 +150,7 @@ a release-queue event, allowing another interested broker to take ownership. *** Asynchronous completion of accept -### HERE + In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. @@ -162,19 +163,32 @@ On receiving an accept the broker: NOTE: The message store does not currently implement asynchronous completions of accept, this is a bug. +*** Multiple CPG groups. + +The old cluster was bottlenecked by processing everything in a single +CPG deliver thread. + +The new cluster uses a set of CPG groups, one per core. Queue names +are hashed to give group indexes, so statistically queues are likely +to be spread over the set of groups. + +Operations on a given queue always use the same group, so we have +order within each queue, but operations on different queues can use +different groups giving greater throughput sending to CPG and multiple +handler threads to process CPG messages. + ** Inconsistent errors. -The new design eliminates most sources of inconsistent errors -(connections, sessions, security, management etc.) The only points -where inconsistent errors can occur are at enqueue and dequeue (most -likely store-related errors.) +An inconsistent error means that after multicasting an enqueue, accept +or dequeue, some brokers succeed in processing it and others fail. -The new design can use the exisiting error-handling protocol with one -major improvement: since brokers are no longer required to maintain -identical state they do not have to stall processing while an error is -being resolved. +The new design eliminates most sources of inconsistent errors in the +old broker: connections, sessions, security, management etc. Only +store journal errors remain. -#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. +The new inconsistent error protocol is similar to the old one with one +major improvement: brokers do not have to stall processing while an +error is being resolved. ** Updating new members @@ -193,60 +207,44 @@ catch up (which is not guaranteed to happen in a bounded time.) With the new cluster design only exchanges, queues, bindings and messages need to be replicated. -Update of wiring (exchanges, queues, bindings) is the same as current -design. - -Update of messages is different: -- per-queue rather than per-broker, separate queues can be updated in parallel. -- updates queues in reverse order to eliminate unbounded catch-up -- does not require updater & updatee to stall during update. +We update individual objects (queues and exchanges) independently. +- create queues first, then update all queues and exchanges in parallel. +- multiple updater threads, per queue/exchange. -Replication events, multicast to cluster: -- enqueue(q,m): message m pushed on back of queue q . -- acquire(q,m): mark m acquired -- dequeue(q,m): forget m. -Messages sent on update connection: -- update_front(q,m): during update, receiver pushes m to *front* of q -- update_done(q): during update, update of q is complete. +Queue updater: +- marks the queue position at the sync point +- sends messages starting from the sync point working towards the head of the queue. +- send "done" message. -Updater: -- when updatee joins set iterator i = q.end() -- while i != q.begin(): --i; send update_front(q,*i) to updatee -- send update_done(q) to updatee +Queue updatee: +- enqueues received from CPG: add to back of queue as normal. +- dequeues received from CPG: apply if found, else save to check at end of update. +- messages from updater: add to the *front* of the queue. +- update complete: apply any saved dequeues. -Updatee: -- q initially in locked state, can't dequeue locally. -- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) -- receive update_front(q,m): q.push_front(m) -- receive update_done(q): q can be unlocked for local dequeing. +Exchange updater: +- updater: send snapshot of exchange as it was at the sync point. -Benefits: -- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. -- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. -- During update consumers actually help by removing messages before they need to be updated. -- Needs no separate "work to do" queue, only the broker queues themselves. +Exchange updatee: +- queue exchange operations after the sync point. +- when snapshot is received: apply saved operations. -# TODO how can we recover from updater crashing before update complete? -# Clear queues that are not updated & send request for udpates on those queues? +Note: +- Updater is active throughout, no stalling. +- Consuming clients actually reduce the size of the update. +- Updatee stalls clients until the update completes. + (Note: May be possible to avoid updatee stall as well, needs thought) -# TODO updatee may receive a dequeue for a message it has not yet seen, needs -# to hold on to that so it can drop the message when it is seen. -# Similar problem exists for wiring? +** Internal cluster interface -** Cluster API - -The new cluster API is similar to the MessageStore interface. -(Initially I thought it would be an extension of the MessageStore interface, -but as the design develops it seems better to make it a separate interface.) +The new cluster interface is similar to the MessageStore interface, but +provides more detail (message positions) and some additional call +points (e.g. acquire) The cluster interface captures these events: - wiring changes: queue/exchange declare/bind - message enqueued/acquired/released/rejected/dequeued. - -The cluster will require some extensions to the Queue: -- Queues can be "locked", locked queues are ignored by IO-driven output. -- Cluster must be able to apply queue events from the cluster to a queue. - These appear to fit into existing queue operations. +- transactional events. ** Maintainability @@ -273,106 +271,48 @@ A number of specific ways the code will be simplified: ** Performance -The only way to verify the relative performance of the new design is -to prototype & profile. The following points suggest the new design -may scale/perform better: - -Some work moved from virtual synchrony thread to connection threads: -- All connection/session logic moves to connection thread. -- Exchange routing logic moves to connection thread. -- On local broker dequeueing is done in connection thread -- Local broker dequeue is IO driven as for a standalone broker. - -For queues with all consumers on a single node dequeue is all -IO-driven in connection thread. Pay for time-sharing only if queue has -consumers on multiple brokers. - -Doing work for different queues in parallel scales on multi-core boxes when -there are multiple queues. - -One difference works against performance, thre is an extra -encode/decode. The old design multicasts raw client data and decodes -it in the virtual synchrony thread. The new design would decode -messages in the connection thread, re-encode them for multicast, and -decode (on non-local brokers) in the virtual synchrony thread. There -is extra work here, but only in the *connection* thread: on a -multi-core machine this happens in parallel for every connection, so -it probably is not a bottleneck. There may be scope to optimize -decode/re-encode by re-using some of the original encoded data, this -could also benefit the stand-alone broker. - -** Asynchronous queue replication - -The existing "asynchronous queue replication" feature maintains a -passive backup passive backup of queues on a remote broker over a TCP -connection. - -The new cluster replication protocol could be re-used to implement -asynchronous queue replication: its just a special case where the -active broker is always the queue owner and the enqueue/dequeue -messages are sent over a TCP connection rather than multicast. - -The new update update mechanism could also work with 'asynchronous -queue replication', allowing such replication (over a TCP connection -on a WAN say) to be initiated after the queue had already been created -and been in use (one of the key missing features). - -** Increasing Concurrency and load sharing - -The current cluster is bottlenecked by processing everything in the -CPG deliver thread. By removing the need for identical operation on -each broker, we open up the possiblility of greater concurrency. - -Handling multicast enqueue, acquire, accpet, release etc: concurrency -per queue. Operatons on different queues can be done in different -threads. - -The new design does not force each broker to do all the work in the -CPG thread so spreading load across cluster members should give some -scale-up. - -** Misc outstanding issues & notes - -Replicating wiring -- Need async completion of wiring commands? -- qpid.sequence_counter: need extra work to support in new design, do we care? - -Cluster+persistence: -- finish async completion: dequeue completion for store & cluster -- cluster restart from store: clean stores *not* identical, pick 1, all others update. -- need to generate cluster ids for messages recovered from store. - -Live updates: we don't need to stall brokers during an update! -- update on queue-by-queue basis. -- updatee locks queues during update, no dequeue. -- update in reverse: don't update messages dequeued during update. -- updatee adds update messages at front (as normal), replicated messages at back. -- updater starts from back, sends "update done" when it hits front of queue. - -Flow control: need to throttle multicasting -1. bound the number of outstanding multicasts. -2. ensure the entire cluster keeps up, no unbounded "lag" -The existing design uses read-credit to solve 1., and does not solve 2. -New design should stop reading on all connections while flow control -condition exists? - -Can federation also be unified, at least in configuration? - -Consider queues (and exchanges?) as having "reliability" attributes: -- persistent: is the message stored on disk. -- backed-up (to another broker): active/passive async replication. -- replicated (to a cluster): active/active multicast replication to cluster. -- federated: federation link to a queue/exchange on another broker. - -"Reliability" seems right for the first 3 but not for federation, is -there a better term? - -Clustering and scalability: new design may give us the flexibility to -address scalability as part of cluster design. Think about -relationship to federation and "fragmented queues" idea. - -* Design debates/descisions +The standalone broker processes _connections_ concurrently, so CPU +usage increases as you add more connections. + +The new cluster processes _queues_ concurrently, so CPU usage increases as you +add more queues. + +In both cases, CPU usage peaks when the number of "units of + concurrency" (connections or queues) goes above the number of cores. + +When all consumers on a queue are connected to the same broker the new +cluster uses the same messagea allocation threading/logic as a +standalone broker, with a little extra asynchronous book-keeping. + +If a queue has multiple consumers connected to multiple brokers, the +new cluster time-shares the queue which is less efficient than having +all consumers on a queue connected to the same broker. +** Flow control +New design does not queue up CPG delivered messages, they are +processed immediately in the CPG deliver thread. This means that CPG's +flow control is sufficient for qpid. + +** Live upgrades + +Live upgrades refers to the ability to upgrade a cluster while it is +running, with no downtime. Each brokers in the cluster is shut down, +and then re-started with a new version of the broker code. + +To achieve this +- Cluster protocl XML file has a new element <version number=N> attached + to each method. This is the version at which the method was added. +- New versions can only add methods, existing methods cannot be changed. +- The cluster handshake for new members includes the protocol version + at each member. +- The cluster's version is the lowest version among its members. +- A newer broker can join and older cluster. When it does, it must restrict + itself to speaking the older version protocol. +- When the cluster version increases (because the lowest version member has left) + the remaining members may move up to the new version. + + +* Design debates ** Active/active vs. active passive An active-active cluster can be used in an active-passive mode. In @@ -385,7 +325,7 @@ An active/passive implementation allows some simplifications over active/active: - can do immediate local enqueue and still guarantee order. Active/passive introduces a few extra requirements: -- Exactly one broker hast to take over if primary fails. +- Exactly one broker has to take over if primary fails. - Passive members must refuse client connections. - On failover, clients must re-try all known addresses till they find the active member. @@ -393,43 +333,17 @@ Active/active benefits: - A broker failure only affects the subset of clients connected to that broker. - Clients can switch to any other broker on failover - Backup brokers are immediately available on failover. -- Some load sharing: reading from client + multicast only done on direct node. - -Active/active drawbacks: -- Co-ordinating message acquisition may impact performance (not tested) -- Code may be more complex that active/passive. +- As long as a client can connect to any broker in the cluster, it can be served. Active/passive benefits: -- Don't need message allocation strategy, can feed consumers at top speed. -- Code may be simpler than active/active. +- Don't need to replicate message allocation, can feed consumers at top speed. Active/passive drawbacks: - All clients on one node so a failure affects every client in the system. - After a failure there is a "reconnect storm" as every client reconnects to the new active node. - After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. - Clients must find the single active node, may involve multiple connect attempts. +- No service if a partition separates a client from the active broker, + even if the client can see other brokers. -** Total ordering. - -Initial thinking: allow message ordering to differ between brokers. -New thinking: use CPG total ordering, get identical ordering on all brokers. -- Allowing variation in order introduces too much chance of unexpected behavior. -- Usign total order allows other optimizations, see Message Identifiers below. - -** Message identifiers. - -Initial thinking: message ID = CPG node id + 64 bit sequence number. -This involves a lot of mapping between cluster IDs and broker messsages. - -New thinking: message ID = queue name + queue position. -- Removes most of the mapping and memory management for cluster code. -- Requires total ordering of messages (see above) - -** Message rejection - -Initial thinking: add special reject/rejected points to cluster interface so -rejected messages could be re-queued without multicast. -New thinking: treat re-queueing after reject as entirely new message. -- Simplifies cluster interface & implementation -- Not on the critical path. diff --git a/cpp/design_docs/new-cluster-plan.txt b/cpp/design_docs/new-cluster-plan.txt index 781876e55a..626e443be7 100644 --- a/cpp/design_docs/new-cluster-plan.txt +++ b/cpp/design_docs/new-cluster-plan.txt @@ -17,376 +17,156 @@ # specific language governing permissions and limitations # under the License. +* Status of impementation -Notes on new cluster implementation. See also: new-cluster-design.txt +Meaning of priorities: +[#A] Essential for basic functioning. +[#B] Required for first release. +[#C] Can be addressed in a later release. -* Implementation plan. +The existig prototype is bare bones to do performance benchmarks: +- Implements publish and consumer locking protocol. +- Defered delivery and asynchronous completion of message. +- Optimize the case all consumers are on the same node. +- No new member updates, no failover updates, no transactions, no persistence etc. -Co-existence with old cluster code and tests: -- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. -- Double up tests with old version/new version as the new code develops. +Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/ -Minimal POC for message delivery & perf test. -- no wiring replication, no updates, no failover, no persistence, no async completion. -- just implement publish and acquire/dequeue locking protocol. -- optimize the special case where all consumers are on the same node. -- measure performance: compare active-passive and active-active modes of use. +** Similarities to existing cluster. -Full implementation of transient cluster -- Update (based on existing update), async completion etc. -- Passing all existing transient cluster tests. +/Active-active/: the new cluster can be a drop-in replacement for the +old, existing tests & customer deployment configurations are still +valid. -Persistent cluster -- Make sure async completion works correctly. -- InitialStatus protoocl etc. to support persistent start-up (existing code) -- cluster restart from store: stores not identical. Load one, update the rest. - - assign cluster ID's to messages recovered from store, don't replicate. +/Virtual synchrony/: Uses corosync to co-ordinate activity of members. -Improved update protocol -- per-queue, less stalling, bounded catch-up. +/XML controls/: Uses XML to define the primitives multicast to the +cluster. -* Task list +** Differences with existing cluster. -** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. +/Report rather than predict consumption/: brokers explicitly tell each +other which messages have been acquired or dequeued. This removes the +major cause of bugs in the existing cluster. -NOTE: as implementation questions arise, take the easiest option and make -a note for later optimization/improvement. +/Queue consumer locking/: to avoid duplicates only one broker can acquire or +dequeue messages at a time - while has the consume-lock on the +queue. If multiple brokers are consuming from the same queue the lock +is passed around to time-share access to the queue. -*** Tests -- python test: 4 senders, numbered messages, 4 receivers, verify message set. -- acquire then release messages: verify can be dequeued on any member -- acquire then kill broker: verify can be dequeued other members. -- acquire then reject: verify goes on alt-exchange once only. +/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting +the concurrency of the host) to allow concurrent processing on +different queues. Queues are hashed onto the groups. -*** DONE broker::Cluster interface and call points. +* Completed tasks +** DONE [#A] Minimal POC: publish/acquire/dequeue protocol. + CLOSED: [2011-10-05 Wed 16:03] -Initial interface commited. +Defines broker::Cluster interface and call points. +Initial interface commite -*** Main classes +Main classes +Core: central object holding cluster classes together (replaces cluster::Cluster) +BrokerContext: implements broker::Cluster interface. +QueueContext: Attached to a broker::Queue, holds cluster status. +MessageHolder:holds local messages while they are being enqueued. -BrokerHandler: -- implements broker::Cluster intercept points. -- sends mcast events to inform cluster of local actions. -- thread safe, called in connection threads. +Implements multiple CPG groups for better concurrency. -LocalMessageMap: -- Holds local messages while they are being enqueued. -- thread safe: called by both BrokerHandler and MessageHandler - -MessageHandler: -- handles delivered mcast messages related to messages. -- initiates local actions in response to mcast events. -- thread unsafe, only called in deliver thread. -- maintains view of cluster state regarding messages. +** DONE [#A] Large message replication. + CLOSED: [2011-10-05 Wed 17:22] +Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame) -QueueOwnerHandler: -- handles delivered mcast messages related to queue consumer ownership. -- thread safe, called in deliver, connection and timer threads. -- maintains view of cluster state regarding queue ownership. - -cluster::Core: class to hold new cluster together (replaces cluster::Cluster) -- thread safe: manage state used by both MessageHandler and BrokerHandler - -The following code sketch illustrates only the "happy path" error handling -is omitted. - -*** BrokerHandler -Types: -- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } -- struct - -NOTE: -- Messages on queues are identified by a queue name + a position. -- Messages being routed are identified by a sequence number. - -Members: -- thread_local bool noReplicate // suppress replication. -- thread_local bool isRouting // suppress operations while routing -- Message localMessage[SequenceNumber] // local messages being routed. -- thread_local SequenceNumber routingSequence - -NOTE: localMessage is also modified by MessageHandler. - -broker::Cluster intercept functions: - -routing(msg) - if noReplicate: return - # Supress everything except enqueues while we are routing. - # We don't want to replicate acquires & dequeues caused by an enqueu, - # e.g. removal of messages from ring/LV queues. - isRouting = true - -enqueue(qmsg): - if noReplicate: return - if routingSequence == 0 # thread local - routingSequence = nextRoutingSequence() - mcast create(encode(qmsg.msg),routingSeq) - mcast enqueue(qmsg.q,routingSeq) - -routed(msg): - if noReplicate: return - isRouting = false - -acquire(qmsg): - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - if msg.id: mcast acquire(qmsg) - -release(QueuedMessage) - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - mcast release(qmsg) - -accept(QueuedMessage): - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - mcast accept(qmsg) - -reject(QueuedMessage): - isRejecting = true - mcast reject(qmsg) - -# FIXME no longer needed? -drop(QueuedMessage) - cleanup(qmsg) - -*** MessageHandler and mcast messages -Types: -- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } -- struct QueueKey { MessageId id; QueueName q; } -- typedef map<QueueKey, QueueEntry> Queue -- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; } - -Members: -- QueueEntry enqueued[QueueKey] -- Node node[NodeId] - -Mcast messages in Message class: - -create(msg,seq) - if sender != self: node[sender].routing[seq] = decode(msg) - -enqueue(q,seq): - id = (sender,seq) - if sender == self: - enqueued[id,q] = (localMessage[seq], acquired=None) - else: - msg = sender.routing[seq] - enqueued[id,q] = (qmsg, acquired=None) - with noReplicate=true: qmsg = broker.getQueue(q).push(msg) - -routed(seq): - if sender == self: localMessage.erase(msg.id.seq) - else: sender.routing.erase(seq) - -acquire(id,q): - enqueued[id,q].acquired = sender - node[sender].acquired.push_back((id,q)) - if sender != self: - with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) - -release(id,q) - enqueued[id,q].acquired = None - node[sender].acquired.erase((id,q)) - if sender != self - with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) - -reject(id,q): - sender.routing[id] = enqueued[id,q] # prepare for re-queueing - -rejected(id,q) - sender.routing.erase[id] - -dequeue(id,q) - entry = enqueued[id,q] - enqueued.erase[id,q] - node[entry.acquired].acquired.erase(id,q) - if sender != self: - with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) - -member m leaves cluster: - for key in node[m].acquired: - release(key.id, key.q) - node.erase(m) - -*** Queue consumer locking - -When a queue is locked it does not deliver messages to its consumers. - -New broker::Queue functions: -- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. -- startConsumers(): reset consumersStopped flag - -Implementation sketch, locking omitted: - -void Queue::stopConsumers() { - consumersStopped = true; - while (consumersBusy) consumersBusyMonitor.wait(); -} - -void Queue::startConsumers() { - consumersStopped = false; - listeners.notify(); -} - -bool Queue::dispatch(consumer) { - if (consumersStopped) return false; - ++consumersBusy; - do_regular_dispatch_body() - if (--consumersBusy == 0) consumersBusyMonitor.notify(); -} - -*** QueueOwnerHandler - -Invariants: -- Each queue is owned by at most one node at any time. -- Each node is interested in a set of queues at any given time. -- A queue is un-owned if no node is interested. - -The queue owner releases the queue when -- it loses interest i.e. queue has no consumers with credit. -- a configured time delay expires and there are other interested nodes. - -The owner mcasts release(q). On delivery the new queue owner is the -next node in node-id order (treating nodes as a circular list) -starting from the old owner that is interested in the queue. - -Queue consumers initially are stopped, only started when we get -ownership from the cluster. - -Thread safety: called by deliver, connection and timer threads, needs locking. - -Thread safe object per queue holding queue ownership status. -Called by deliver, connection and timer threads. - -class QueueOwnership { - bool owned; - Timer timer; - BrokerQueue q; - - drop(): # locked - if owned: - owned = false - q.stopConsumers() - mcast release(q.name, false) - timer.stop() - - take(): # locked - if not owned: - owned = true - q.startConsumers() - timer.start(timeout) - - timer.fire(): drop() -} - -Data Members, only modified/examined in deliver thread: -- typedef set<NodeId> ConsumerSet -- map<QueueName, ConsumerSet> consumers -- map<QueueName, NodeId> owner +* Open questions -Thread safe data members, accessed in connection threads (via BrokerHandler): -- map<QueueName, QueueOwnership> ownership +** TODO [#A] Queue sequence numbers vs. independant message IDs. + SCHEDULED: <2011-10-07 Fri> -Multicast messages in QueueOwner class: +Current prototype uses queue sequence numbers to identify +message. This is tricky for updating new members as the sequence +numbers are only known on delivery. -consume(q): - if sender==self and consumers[q].empty(): ownership[q].take() - consumers[q].insert(sender) +Independent message IDs that can be generated and sent with the message simplify +this and potentially allow performance benefits by relaxing total ordering. +However they imply additional map lookups that might hurt performance. -release(q): - asssert(owner[q] == sender and owner[q] in consumers[q]) - owner[q] = circular search from sender in consumers[q] - if owner==self: ownership[q].take() +- [X] Prototype independent message IDs, check performance. +Throughput worse by 30% in contented case, 10% in uncontended. +Sticking with queue sequence numbers. -cancel(q): - assert(queue[q].owner != sender) # sender must release() before cancel() - consumers[q].erase(sender) +* Outstanding Tasks +** TODO [#A] Defer and async completion of wiring commands. -member-leaves: - for q in queue: if owner[q] = left: left.release(q) +Testing requirement: Many tests assume wiring changes are visible +across the cluster once the commad completes. -Need 2 more intercept points in broker::Cluster: +Name clashes: need to avoid race if same name queue/exchange declared +on 2 brokers simultaneously -consume(q,consumer,consumerCount) - Queue::consume() - if consumerCount == 1: mcast consume(q) +** TODO [#A] Passing all existing cluster tests. -cancel(q,consumer,consumerCount) - Queue::cancel() - if consumerCount == 0: - ownership[q].drop() - mcast cancel(q) +The new cluster should be a drop-in replacement for the old, so it +should be able to pass all the existing tests. -#TODO: lifecycle, updating cluster data structures when queues are destroyed - -*** Increasing concurrency -The major performance limitation of the old cluster is that it does -everything in the single CPG deliver thread context. - -We can get additional concurrency by creating a thread context _per queue_ -for queue operations: enqueue, acquire, accept etc. - -We associate a PollableQueue of queue operations with each AMQP queue. -The CPG deliver thread would -- build messages and associate with cluster IDs. -- push queue ops to the appropriate PollableQueue to be dispatched the queues thread. - -Serializing operations on the same queue avoids contention, but takes advantage -of the independence of operations on separate queues. +** TODO [#A] Update to new members joining. -*** Re-use of existing cluster code -- re-use Event -- re-use Multicaster -- re-use same PollableQueueSetup (may experiment later) -- new Core class to replace Cluster. -- keep design modular, keep threading rules clear. +Need to resolve [[Queue sequence numbers vs. independant message IDs]] first. +- implicit sequence numbers are more tricky to replicate to new member. -** TODO [#B] Large message replication. -Multicast should encode messages in fixed size buffers (64k)? -Can't assume we can send message in one chunk. -For 0-10 can use channel numbers & send whole frames packed into larger buffer. -** TODO [#B] Transaction support. -Extend broker::Cluster interface to capture transaction context and completion. -Sequence number to generate per-node tx IDs. -Replicate transaction completion. -** TODO [#B] Batch CPG multicast messages -The new cluster design involves a lot of small multicast messages, -they need to be batched into larger CPG messages for efficiency. -** TODO [#B] Genuine async completion -Replace current synchronous waiting implementation with genuine async completion. +Update individual objects (queues and exchanges) independently. +- create queues first, then update all queues and exchanges in parallel. +- multiple updater threads, per queue/exchange. +- updater sends messages to special exchange(s) (not using extended AMQP controls) + +Queue updater: +- marks the queue position at the sync point +- sends messages starting from the sync point working towards the head of the queue. +- send "done" message. +Note: updater remains active throughout, consuming clients actually reduce the +size of the update. -Test: enhance test_store.cpp to defer enqueueComplete till special message received. +Queue updatee: +- enqueues received from CPG: add to back of queue as normal. +- dequeues received from CPG: apply if found, else save to check at end of update. +- messages from updater: add to the *front* of the queue. +- update complete: apply any saved dequeues. -Async callback uses *requestIOProcessing* to queue action on IO thread. +Exchange updater: +- updater: send snapshot of exchange as it was at the sync point. -** TODO [#B] Async completion of accept when dequeue completes. -Interface is already there on broker::Message, just need to ensure -that store and cluster implementations call it appropriately. +Exchange updatee: +- queue exchange operations after the sync point. +- when snapshot is received: apply saved operations. -** TODO [#B] Replicate wiring. -From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. +Updater remains active throughout. +Updatee stalls clients until the update completes. -** TODO [#B] New members joining - first pass +Updating queue/exchange/binding objects is via the same encode/decode +that is used by the store. Updatee to use recovery interfaces to +recover? -Re-use update code from old cluster but don't replicate sessions & -connections. +** TODO [#A] Failover updates to client. +Implement the amq.failover exchange to notify clients of membership. -Need to extend it to send cluster IDs with messages. +** TODO [#B] Initial status protocol. +Handshake to give status of each broker member to new members joining. +Status includes +- persistent store state (clean, dirty) +- cluster protocol version. -Need to replicate the queue ownership data as part of the update. +** TODO [#B] Replace boost::hash with our own hash function. +The hash function is effectively part of the interface so +we need to be sure it doesn't change underneath us. -** TODO [#B] Persistence support. -InitialStatus protoocl etc. to support persistent start-up (existing code) +** TODO [#B] Persistent cluster support. +Initial status protoocl to support persistent start-up (see existing code) Only one broker recovers from store, update to others. Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. -** TODO [#B] Handle other ways that messages can leave a queue. - -Other ways (other than via a consumer) that messages are take off a queue. - -NOTE: Not controlled by queue lock, how to make them consistent? - +** TODO [#B] Management support +Replicate management methods that modify queues - e.g. move, purge. Target broker may not have all messages on other brokers for purge/destroy. - Queue::move() - need to wait for lock? Replicate? - Queue::get() - ??? @@ -395,66 +175,48 @@ Target broker may not have all messages on other brokers for purge/destroy. Need to add callpoints & mcast messages to replicate these? -** TODO [#B] Flow control for internal queues. - -Need to bound the size of internal queues: delivery and multicast. -- stop polling for read on client connections when we reach a bound. -- restart polling when we get back under it. - -That will stop local multicasting, we still have to deal with remote -multicasting (note existing cluster does not do this.) Something like: -- when over bounds multicast a flow-control event. -- on delivery of flow-control all members stop polling to read client connections -- when back under bounds send flow-control-end, all members resume -- if flow-controling member dies others resume - -** TODO [#B] Integration with transactions. -Do we want to replicate during transaction & replicate commit/rollback -or replicate only on commit? -No integration with DTX transactions. -** TODO [#B] Make new cluster work with replication exchange. -Possibly re-use some common logic. Replication exchange is like clustering -except over TCP. -** TODO [#B] Better concurrency, scalabiility on multi-cores. -Introduce PollableQueue of operations per broker queue. Queue up mcast -operations (enqueue, acquire, accept etc.) to be handled concurrently -on different queue. Performance testing to verify improved scalability. -** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. -Cluster needs to complete these asynchronously to guarantee resources -exist across the cluster when the command completes. +** TODO [#B] TX transaction support. +Extend broker::Cluster interface to capture transaction context and completion. +Running brokers exchange TX information. +New broker update includes TX information. -** TODO [#C] Allow non-replicated exchanges, queues. + // FIXME aconway 2010-10-18: As things stand the cluster is not + // compatible with transactions + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? + // - no transaction context associated with messages in the Cluster interface. + // - no call to Cluster::accept in Queue::dequeueCommitted -Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. -- save replicated status to store. -- support in management tools. -Replicated exchange: replicate binds to replicated queues. -Replicated queue: replicate all messages. +** TODO [#B] DTX transaction support. +Extend broker::Cluster interface to capture transaction context and completion. +Running brokers exchange DTX information. +New broker update includes DTX information. -** TODO [#C] New members joining - improved. +** TODO [#B] Async completion of accept. +When this is fixed in the standalone broker, it should be fixed for cluster. -Replicate wiring like old cluster, stall for wiring but not for -messages. Update messages on a per-queue basis from back to front. +** TODO [#B] Network partitions and quorum. +Re-use existing implementation. -Updater: -- stall & push wiring: declare exchanges, queues, bindings. -- start update iterator thread on each queue. -- unstall and process normally while iterator threads run. +** TODO [#B] Review error handling, put in a consitent model. +- [ ] Review all asserts, for possible throw. +- [ ] Decide on fatal vs. non-fatal errors. -Update iterator thread: -- starts at back of updater queue, message m. -- send update_front(q,m) to updatee and advance towards front -- at front: send update_done(q) +** TODO [#B] Implement inconsistent error handling policy. +What to do if a message is enqueued sucessfully on the local broker, +but fails on one or more backups - e.g. due to store limits? +- we have more flexibility, we don't *have* to crash +- but we've loste some of our redundancy guarantee, how should we inform client? -Updatee: -- stall, receive wiring, lock all queues, mark queues "updating", unstall -- update_front(q,m): push m to *front* of q -- update_done(q): mark queue "ready" +** TODO [#C] Allow non-replicated exchanges, queues. -Updatee cannot take the queue consume lock for a queue that is updating. -Updatee *can* push messages onto a queue that is updating. +Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects. +- save replicated status to store. +- support in management tools. +Replicated queue: replicate all messages. +Replicated exchange: replicate bindings to replicated queues only. -TODO: Is there any way to eliminate the stall for wiring? +Configurable default? Defaults to true. ** TODO [#C] Refactoring of common concerns. @@ -469,9 +231,46 @@ Look for ways to capitalize on the similarity & simplify the code. In particular QueuedEvents (async replication) strongly resembles cluster replication, but over TCP rather than multicast. -** TODO [#C] Concurrency for enqueue events. -All enqueue events are being processed in the CPG deliver thread context which -serializes all the work. We only need ordering on a per queue basis, can we -enqueue in parallel on different queues and will that improve performance? + ** TODO [#C] Handling immediate messages in a cluster Include remote consumers in descision to deliver an immediate message? +** TODO [#C] Remove old cluster hacks and workarounds +The old cluster has workarounds in the broker code that can be removed. +- [ ] drop code to replicate management model. +- [ ] drop timer workarounds for TTL, management, heartbeats. +- [ ] drop "cluster-safe assertions" in broker code. +- [ ] drop connections, sessions, management from cluster update. +- [ ] drop security workarounds: cluster code now operates after message decoding. +- [ ] drop connection tracking in cluster code. +- [ ] simpler inconsistent-error handling code, no need to stall. + +** TODO [#C] Support for live upgrades. + +Allow brokers in a running cluster to be replaced one-by-one with a new version. +(see new-cluster-design for design notes.) + +The old cluster protocol was unstable because any changes in broker +state caused changes to the cluster protocol.The new design should be +much more stable. + +Points to implement in anticipation of live upgrade: +- Prefix each CPG message with a version number and length. + Version number determines how to decode the message. +- Brokers ignore messages that have a higher version number than they understand. +- Protocol version XML element in cluster.xml, on each control. +- Initial status protocol to include protocol version number. + +New member udpates: use the store encode/decode for updates, use the +same backward compatibility strategy as the store. This allows for +adding new elements to the end of structures but not changing or +removing new elements. + +** TODO [#C] Support for AMQP 1.0. + +* Testing +** TODO [#A] Pass all existing cluster tests. +Requires [[Defer and async completion of wiring commands.]] +** TODO [#A] New cluster tests. +Stress tests & performance benchmarks focused on changes in new cluster: +- concurrency by queues rather than connections. +- different handling shared queues when consuemrs are on different brokers. |