diff options
Diffstat (limited to 'cpp/design_docs/new-cluster-design.txt')
-rw-r--r-- | cpp/design_docs/new-cluster-design.txt | 285 |
1 files changed, 186 insertions, 99 deletions
diff --git a/cpp/design_docs/new-cluster-design.txt b/cpp/design_docs/new-cluster-design.txt index a162ea68ec..7adb46fee3 100644 --- a/cpp/design_docs/new-cluster-design.txt +++ b/cpp/design_docs/new-cluster-design.txt @@ -17,6 +17,7 @@ # under the License. * A new design for Qpid clustering. + ** Issues with current design. The cluster is based on virtual synchrony: each broker multicasts @@ -94,9 +95,8 @@ Use a moving queue ownership protocol to agree order of dequeues. No longer relies on identical state and lock-step behavior to cause identical dequeues on each broker. -Use multiple CPG groups to process different queues in parallel. Use a -fixed set of groups and hash queue names to choose the group for each -queue. +Each queue has an associated thread-context. Events for a queue are executed +in that queues context, in parallel with events for other queues. *** Requirements @@ -149,7 +149,7 @@ a release-queue event, allowing another interested broker to take ownership. *** Asynchronous completion of accept - +### HERE In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. @@ -162,32 +162,19 @@ On receiving an accept the broker: NOTE: The message store does not currently implement asynchronous completions of accept, this is a bug. -*** Multiple CPG groups. - -The old cluster was bottlenecked by processing everything in a single -CPG deliver thread. - -The new cluster uses a set of CPG groups, one per core. Queue names -are hashed to give group indexes, so statistically queues are likely -to be spread over the set of groups. - -Operations on a given queue always use the same group, so we have -order within each queue, but operations on different queues can use -different groups giving greater throughput sending to CPG and multiple -handler threads to process CPG messages. - ** Inconsistent errors. -An inconsistent error means that after multicasting an enqueue, accept -or dequeue, some brokers succeed in processing it and others fail. +The new design eliminates most sources of inconsistent errors +(connections, sessions, security, management etc.) The only points +where inconsistent errors can occur are at enqueue and dequeue (most +likely store-related errors.) -The new design eliminates most sources of inconsistent errors in the -old broker: connections, sessions, security, management etc. Only -store journal errors remain. +The new design can use the exisiting error-handling protocol with one +major improvement: since brokers are no longer required to maintain +identical state they do not have to stall processing while an error is +being resolved. -The new inconsistent error protocol is similar to the old one with one -major improvement: brokers do not have to stall processing while an -error is being resolved. +#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. ** Updating new members @@ -206,44 +193,60 @@ catch up (which is not guaranteed to happen in a bounded time.) With the new cluster design only exchanges, queues, bindings and messages need to be replicated. -We update individual objects (queues and exchanges) independently. -- create queues first, then update all queues and exchanges in parallel. -- multiple updater threads, per queue/exchange. +Update of wiring (exchanges, queues, bindings) is the same as current +design. + +Update of messages is different: +- per-queue rather than per-broker, separate queues can be updated in parallel. +- updates queues in reverse order to eliminate unbounded catch-up +- does not require updater & updatee to stall during update. -Queue updater: -- marks the queue position at the sync point -- sends messages starting from the sync point working towards the head of the queue. -- send "done" message. +Replication events, multicast to cluster: +- enqueue(q,m): message m pushed on back of queue q . +- acquire(q,m): mark m acquired +- dequeue(q,m): forget m. +Messages sent on update connection: +- update_front(q,m): during update, receiver pushes m to *front* of q +- update_done(q): during update, update of q is complete. -Queue updatee: -- enqueues received from CPG: add to back of queue as normal. -- dequeues received from CPG: apply if found, else save to check at end of update. -- messages from updater: add to the *front* of the queue. -- update complete: apply any saved dequeues. +Updater: +- when updatee joins set iterator i = q.end() +- while i != q.begin(): --i; send update_front(q,*i) to updatee +- send update_done(q) to updatee -Exchange updater: -- updater: send snapshot of exchange as it was at the sync point. +Updatee: +- q initially in locked state, can't dequeue locally. +- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) +- receive update_front(q,m): q.push_front(m) +- receive update_done(q): q can be unlocked for local dequeing. -Exchange updatee: -- queue exchange operations after the sync point. -- when snapshot is received: apply saved operations. +Benefits: +- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. +- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. +- During update consumers actually help by removing messages before they need to be updated. +- Needs no separate "work to do" queue, only the broker queues themselves. -Note: -- Updater is active throughout, no stalling. -- Consuming clients actually reduce the size of the update. -- Updatee stalls clients until the update completes. - (Note: May be possible to avoid updatee stall as well, needs thought) +# TODO how can we recover from updater crashing before update complete? +# Clear queues that are not updated & send request for udpates on those queues? -** Internal cluster interface +# TODO updatee may receive a dequeue for a message it has not yet seen, needs +# to hold on to that so it can drop the message when it is seen. +# Similar problem exists for wiring? -The new cluster interface is similar to the MessageStore interface, but -provides more detail (message positions) and some additional call -points (e.g. acquire) +** Cluster API + +The new cluster API is similar to the MessageStore interface. +(Initially I thought it would be an extension of the MessageStore interface, +but as the design develops it seems better to make it a separate interface.) The cluster interface captures these events: - wiring changes: queue/exchange declare/bind - message enqueued/acquired/released/rejected/dequeued. -- transactional events. + +The cluster will require some extensions to the Queue: +- Queues can be "locked", locked queues are ignored by IO-driven output. +- Cluster must be able to apply queue events from the cluster to a queue. + These appear to fit into existing queue operations. ** Maintainability @@ -270,48 +273,106 @@ A number of specific ways the code will be simplified: ** Performance -The standalone broker processes _connections_ concurrently, so CPU -usage increases as you add more connections. - -The new cluster processes _queues_ concurrently, so CPU usage increases as you -add more queues. - -In both cases, CPU usage peaks when the number of "units of - concurrency" (connections or queues) goes above the number of cores. - -When all consumers on a queue are connected to the same broker the new -cluster uses the same messagea allocation threading/logic as a -standalone broker, with a little extra asynchronous book-keeping. - -If a queue has multiple consumers connected to multiple brokers, the -new cluster time-shares the queue which is less efficient than having -all consumers on a queue connected to the same broker. +The only way to verify the relative performance of the new design is +to prototype & profile. The following points suggest the new design +may scale/perform better: + +Some work moved from virtual synchrony thread to connection threads: +- All connection/session logic moves to connection thread. +- Exchange routing logic moves to connection thread. +- On local broker dequeueing is done in connection thread +- Local broker dequeue is IO driven as for a standalone broker. + +For queues with all consumers on a single node dequeue is all +IO-driven in connection thread. Pay for time-sharing only if queue has +consumers on multiple brokers. + +Doing work for different queues in parallel scales on multi-core boxes when +there are multiple queues. + +One difference works against performance, thre is an extra +encode/decode. The old design multicasts raw client data and decodes +it in the virtual synchrony thread. The new design would decode +messages in the connection thread, re-encode them for multicast, and +decode (on non-local brokers) in the virtual synchrony thread. There +is extra work here, but only in the *connection* thread: on a +multi-core machine this happens in parallel for every connection, so +it probably is not a bottleneck. There may be scope to optimize +decode/re-encode by re-using some of the original encoded data, this +could also benefit the stand-alone broker. + +** Asynchronous queue replication + +The existing "asynchronous queue replication" feature maintains a +passive backup passive backup of queues on a remote broker over a TCP +connection. + +The new cluster replication protocol could be re-used to implement +asynchronous queue replication: its just a special case where the +active broker is always the queue owner and the enqueue/dequeue +messages are sent over a TCP connection rather than multicast. + +The new update update mechanism could also work with 'asynchronous +queue replication', allowing such replication (over a TCP connection +on a WAN say) to be initiated after the queue had already been created +and been in use (one of the key missing features). + +** Increasing Concurrency and load sharing + +The current cluster is bottlenecked by processing everything in the +CPG deliver thread. By removing the need for identical operation on +each broker, we open up the possiblility of greater concurrency. + +Handling multicast enqueue, acquire, accpet, release etc: concurrency +per queue. Operatons on different queues can be done in different +threads. + +The new design does not force each broker to do all the work in the +CPG thread so spreading load across cluster members should give some +scale-up. + +** Misc outstanding issues & notes + +Replicating wiring +- Need async completion of wiring commands? +- qpid.sequence_counter: need extra work to support in new design, do we care? + +Cluster+persistence: +- finish async completion: dequeue completion for store & cluster +- cluster restart from store: clean stores *not* identical, pick 1, all others update. +- need to generate cluster ids for messages recovered from store. + +Live updates: we don't need to stall brokers during an update! +- update on queue-by-queue basis. +- updatee locks queues during update, no dequeue. +- update in reverse: don't update messages dequeued during update. +- updatee adds update messages at front (as normal), replicated messages at back. +- updater starts from back, sends "update done" when it hits front of queue. + +Flow control: need to throttle multicasting +1. bound the number of outstanding multicasts. +2. ensure the entire cluster keeps up, no unbounded "lag" +The existing design uses read-credit to solve 1., and does not solve 2. +New design should stop reading on all connections while flow control +condition exists? + +Can federation also be unified, at least in configuration? + +Consider queues (and exchanges?) as having "reliability" attributes: +- persistent: is the message stored on disk. +- backed-up (to another broker): active/passive async replication. +- replicated (to a cluster): active/active multicast replication to cluster. +- federated: federation link to a queue/exchange on another broker. + +"Reliability" seems right for the first 3 but not for federation, is +there a better term? + +Clustering and scalability: new design may give us the flexibility to +address scalability as part of cluster design. Think about +relationship to federation and "fragmented queues" idea. + +* Design debates/descisions -** Flow control -New design does not queue up CPG delivered messages, they are -processed immediately in the CPG deliver thread. This means that CPG's -flow control is sufficient for qpid. - -** Live upgrades - -Live upgrades refers to the ability to upgrade a cluster while it is -running, with no downtime. Each brokers in the cluster is shut down, -and then re-started with a new version of the broker code. - -To achieve this -- Cluster protocl XML file has a new element <version number=N> attached - to each method. This is the version at which the method was added. -- New versions can only add methods, existing methods cannot be changed. -- The cluster handshake for new members includes the protocol version - at each member. -- The cluster's version is the lowest version among its members. -- A newer broker can join and older cluster. When it does, it must restrict - itself to speaking the older version protocol. -- When the cluster version increases (because the lowest version member has left) - the remaining members may move up to the new version. - - -* Design debates ** Active/active vs. active passive An active-active cluster can be used in an active-passive mode. In @@ -324,7 +385,7 @@ An active/passive implementation allows some simplifications over active/active: - can do immediate local enqueue and still guarantee order. Active/passive introduces a few extra requirements: -- Exactly one broker has to take over if primary fails. +- Exactly one broker hast to take over if primary fails. - Passive members must refuse client connections. - On failover, clients must re-try all known addresses till they find the active member. @@ -332,17 +393,43 @@ Active/active benefits: - A broker failure only affects the subset of clients connected to that broker. - Clients can switch to any other broker on failover - Backup brokers are immediately available on failover. -- As long as a client can connect to any broker in the cluster, it can be served. +- Some load sharing: reading from client + multicast only done on direct node. + +Active/active drawbacks: +- Co-ordinating message acquisition may impact performance (not tested) +- Code may be more complex that active/passive. Active/passive benefits: -- Don't need to replicate message allocation, can feed consumers at top speed. +- Don't need message allocation strategy, can feed consumers at top speed. +- Code may be simpler than active/active. Active/passive drawbacks: - All clients on one node so a failure affects every client in the system. - After a failure there is a "reconnect storm" as every client reconnects to the new active node. - After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. - Clients must find the single active node, may involve multiple connect attempts. -- No service if a partition separates a client from the active broker, - even if the client can see other brokers. +** Total ordering. + +Initial thinking: allow message ordering to differ between brokers. +New thinking: use CPG total ordering, get identical ordering on all brokers. +- Allowing variation in order introduces too much chance of unexpected behavior. +- Usign total order allows other optimizations, see Message Identifiers below. + +** Message identifiers. + +Initial thinking: message ID = CPG node id + 64 bit sequence number. +This involves a lot of mapping between cluster IDs and broker messsages. + +New thinking: message ID = queue name + queue position. +- Removes most of the mapping and memory management for cluster code. +- Requires total ordering of messages (see above) + +** Message rejection + +Initial thinking: add special reject/rejected points to cluster interface so +rejected messages could be re-queued without multicast. +New thinking: treat re-queueing after reject as entirely new message. +- Simplifies cluster interface & implementation +- Not on the critical path. |