diff options
Diffstat (limited to 'cpp/design_docs')
| -rw-r--r-- | cpp/design_docs/hot-standby-design.txt | 239 | ||||
| -rw-r--r-- | cpp/design_docs/new-cluster-design.txt | 285 | ||||
| -rw-r--r-- | cpp/design_docs/new-cluster-plan.txt | 545 |
3 files changed, 570 insertions, 499 deletions
diff --git a/cpp/design_docs/hot-standby-design.txt b/cpp/design_docs/hot-standby-design.txt deleted file mode 100644 index 99a5dc0199..0000000000 --- a/cpp/design_docs/hot-standby-design.txt +++ /dev/null @@ -1,239 +0,0 @@ --*-org-*- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -* Another new design for Qpid clustering. - -For background see [[./new-cluster-design.txt]] which describes the issues -with the old design and a new active-active design that could replace it. - -This document describes an alternative hot-standby approach. - -** Delivery guarantee - -We guarantee N-way redundant, at least once delivey. Once a message -from a client has been acknowledged by the broker, it will be -delivered even if N-1 brokers subsequently fail. There may be -duplicates in the event of a failure. We don't make duplicates -during normal operation (i.e when no brokers have failed) - -This is the same guarantee as the old cluster and the alternative -active-active design. - -** Active-active vs. hot standby (aka primary-backup) - -An active-active cluster allows clients to connect to any broker in -the cluster. If a broker fails, clients can fail-over to any other -live broker. - -A hot-standby cluster has only one active broker at a time (the -"primary") and one or more brokers on standby (the "backups"). Clients -are only served by the leader, clients that connect to a backup are -redirected to the leader. The backpus are kept up-to-date in real time -by the primary, if the primary fails a backup is elected to be the new -primary. - -Aside: A cold-standby cluster is possible using a standalone broker, -CMAN and shared storage. In this scenario only one broker runs at a -time writing to a shared store. If it fails, another broker is started -(by CMAN) and recovers from the store. This bears investigation but -the store recovery time is probably too long for failover. - -** Why hot standby? - -Active-active has some advantages: -- Finding a broker on startup or failover is simple, just pick any live broker. -- All brokers are always running in active mode, there's no -- Distributing clients across brokers gives better performance, but see [1]. -- A broker failure affects only clients connected to that broker. - -The main problem with active-active is co-ordinating consumers of the -same queue on multiple brokers such that there are no duplicates in -normal operation. There are 2 approaches: - -Predictive: each broker predicts which messages others will take. This -the main weakness of the old design so not appealing. - -Locking: brokers "lock" a queue in order to take messages. This is -complex to implement, its not straighforward to determine the most -performant strategie for passing the lock. - -Hot-standby removes this problem. Only the primary can modify queues -so it just has to tell the backups what it is doing, there's no -locking. - -The primary can enqueue messages and replicate asynchronously - -exactly like the store does, but it "writes" to the replicas over the -network rather than writing to disk. - -** Failover in a hot-standby cluster. - -Hot-standby has some potential performance issues around failover: - -- Failover "spike": when the primary fails every client will fail over - at the same time, putting strain on the system. - -- Until a new primary is elected, cluster cannot serve any clients or - redirect clients to the primary. - -We want to minimize the number of re-connect attempts that clients -have to make. The cluster can use a well-known algorithm to choose the -new primary (e.g. round robin on a known sequence of brokers) so that -clients can guess the new primary correctly in most cases. - -Even if clients do guess correctly it may be that the new primary is -not yet aware of the death of the old primary, which is may to cause -multiple failed connect attempts before clients eventually get -connected. We will need to prototype to see how much this happens in -reality and how we can best get clients redirected. - -** Threading and performance. - -The primary-backup cluster operates analogously to the way the disk store does now: -- use the same MessageStore interface as the store to interact with the broker -- use the same asynchronous-completion model for replicating messages. -- use the same recovery interfaces (?) for new backups joining. - -Re-using the well-established store design gives credibility to the new cluster design. - -The single CPG dispatch thread was a severe performance bottleneck for the old cluster. - -The primary has the same threading model as a a standalone broker with -a store, which we know that this performs well. - -If we use CPG for replication of messages, the backups will receive -messages in the CPG dispatch thread. To get more concurency, the CPG -thread can dump work onto internal PollableQueues to be processed in -parallel. - -Messages from the same broker queue need to go onto the same -PollableQueue. There could be a separate PollableQueue for each broker -queue. If that's too resource intensive we can use a fixed set of -PollableQueues and assign broker queues to PollableQueues via hashing -or round robin. - -Another possible optimization is to use multiple CPG queues: one per -queue or a hashed set, to get more concurrency in the CPG layer. The -old cluster is not able to keep CPG busy. - -TODO: Transactions pose a challenge with these concurrent models: how -to co-ordinate multiple messages being added (commit a publish or roll -back an accept) to multiple queues so that all replicas end up with -the same message sequence while respecting atomicity. - -** Use of CPG - -CPG provides several benefits in the old cluster: -- tracking membership (essential for determining the primary) -- handling "spit brain" (integrates with partition support from CMAN) -- reliable multicast protocol to distribute messages. - -I believe we still need CPG for membership and split brain. We could -experiment with sending the bulk traffic over AMQP conections. - -** Flow control - -Need to ensure that -1) In-memory internal queues used by the cluster don't overflow. -2) The backups don't fall too far behind on processing CPG messages - -** Recovery -When a new backup joins an active cluster it must get a snapshot -from one of the other backups, or the primary if there are none. In -store terms this is "recovery" (old cluster called it an "update) - -Compared to old cluster we only replidate well defined data set of the store. -This is the crucial sore spot of old cluster. - -We can also replicated it more efficiently by recovering queues in -reverse (LIFO) order. That means as clients actively consume messages -from the front of the queue, they are redeucing the work we have to do -in recovering from the back. (NOTE: this may not be compatible with -using the same recovery interfaces as the store.) - -** Selective replication -In this model it's easy to support selective replication of individual queues via -configuration. -- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate. - Treated analogously to persistent/durable properties for the store. -- if not explicitly marked, provide a choice of default - - default is replicate (replicated message on replicated queue) - - default is don't replicate - - default is replicate persistent/durable messages. - -** Inconsistent errors - -The new design eliminates most sources of inconsistent errors in the -old design (connections, sessions, security, management etc.) and -eliminates the need to stall the whole cluster till an error is -resolved. We still have to handle inconsistent store errors when store -and cluster are used together. - -We also have to include error handling in the async completion loop to -guarantee N-way at least once: we should only report success to the -client when we know the message was replicated and stored on all N-1 -backups. - -TODO: We have a lot more options than the old cluster, need to figure -out the best approach, or possibly allow mutliple approaches. Need to -go thru the various failure cases. We may be able to do recovery on a -per-queue basis rather than restarting an entire node. - -** New members joining - -We should be able to catch up much faster than the the old design. A -new backup can catch up ("recover") the current cluster state on a -per-queue basis. -- queues can be updated in parallel -- "live" updates avoid the the "endless chase" - -During a "live" update several things are happening on a queue: -- clients are publishing messages to the back of the queue, replicated to the backup -- clients are consuming messages from the front of the queue, replicated to the backup. -- the primary is sending pre-existing messages to the new backup. - -The primary sends pre-existing messages in LIFO order - starting from -the back of the queue, at the same time clients are consuming from the front. -The active consumers actually reduce the amount of work to be done, as there's -no need to replicate messages that are no longer on the queue. - -* Steps to get there - -** Baseline replication -Validate the overall design get initial notion of performance. Just -message+wiring replication, no update/recovery for new members joining, -single CPG dispatch thread on backups, no failover, no transactions. - -** Failover -Electing primary, backups redirect to primary. Measure failover time -for large # clients. Strategies to minimise number of retries after a -failure. - -** Flow Control -Keep internal queues from over-flowing. Similar to internal flow control in old cluster. -Needed for realistic performance/stress tests - -** Concurrency -Experiment with multiple threads on backups, multiple CPG groups. - -** Recovery/new member joining -Initial status handshake for new member. Recovering queues from the back. - -** Transactions -TODO: How to implement transactions with concurrency. Worst solution: -a global --cluster-use-transactions flag that forces single thread -mode. Need to find a better solution. diff --git a/cpp/design_docs/new-cluster-design.txt b/cpp/design_docs/new-cluster-design.txt index a162ea68ec..7adb46fee3 100644 --- a/cpp/design_docs/new-cluster-design.txt +++ b/cpp/design_docs/new-cluster-design.txt @@ -17,6 +17,7 @@ # under the License. * A new design for Qpid clustering. + ** Issues with current design. The cluster is based on virtual synchrony: each broker multicasts @@ -94,9 +95,8 @@ Use a moving queue ownership protocol to agree order of dequeues. No longer relies on identical state and lock-step behavior to cause identical dequeues on each broker. -Use multiple CPG groups to process different queues in parallel. Use a -fixed set of groups and hash queue names to choose the group for each -queue. +Each queue has an associated thread-context. Events for a queue are executed +in that queues context, in parallel with events for other queues. *** Requirements @@ -149,7 +149,7 @@ a release-queue event, allowing another interested broker to take ownership. *** Asynchronous completion of accept - +### HERE In acknowledged mode a message is not forgotten until it is accepted, to allow for requeue on rejection or crash. The accept should not be completed till the message has been forgotten. @@ -162,32 +162,19 @@ On receiving an accept the broker: NOTE: The message store does not currently implement asynchronous completions of accept, this is a bug. -*** Multiple CPG groups. - -The old cluster was bottlenecked by processing everything in a single -CPG deliver thread. - -The new cluster uses a set of CPG groups, one per core. Queue names -are hashed to give group indexes, so statistically queues are likely -to be spread over the set of groups. - -Operations on a given queue always use the same group, so we have -order within each queue, but operations on different queues can use -different groups giving greater throughput sending to CPG and multiple -handler threads to process CPG messages. - ** Inconsistent errors. -An inconsistent error means that after multicasting an enqueue, accept -or dequeue, some brokers succeed in processing it and others fail. +The new design eliminates most sources of inconsistent errors +(connections, sessions, security, management etc.) The only points +where inconsistent errors can occur are at enqueue and dequeue (most +likely store-related errors.) -The new design eliminates most sources of inconsistent errors in the -old broker: connections, sessions, security, management etc. Only -store journal errors remain. +The new design can use the exisiting error-handling protocol with one +major improvement: since brokers are no longer required to maintain +identical state they do not have to stall processing while an error is +being resolved. -The new inconsistent error protocol is similar to the old one with one -major improvement: brokers do not have to stall processing while an -error is being resolved. +#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. ** Updating new members @@ -206,44 +193,60 @@ catch up (which is not guaranteed to happen in a bounded time.) With the new cluster design only exchanges, queues, bindings and messages need to be replicated. -We update individual objects (queues and exchanges) independently. -- create queues first, then update all queues and exchanges in parallel. -- multiple updater threads, per queue/exchange. +Update of wiring (exchanges, queues, bindings) is the same as current +design. + +Update of messages is different: +- per-queue rather than per-broker, separate queues can be updated in parallel. +- updates queues in reverse order to eliminate unbounded catch-up +- does not require updater & updatee to stall during update. -Queue updater: -- marks the queue position at the sync point -- sends messages starting from the sync point working towards the head of the queue. -- send "done" message. +Replication events, multicast to cluster: +- enqueue(q,m): message m pushed on back of queue q . +- acquire(q,m): mark m acquired +- dequeue(q,m): forget m. +Messages sent on update connection: +- update_front(q,m): during update, receiver pushes m to *front* of q +- update_done(q): during update, update of q is complete. -Queue updatee: -- enqueues received from CPG: add to back of queue as normal. -- dequeues received from CPG: apply if found, else save to check at end of update. -- messages from updater: add to the *front* of the queue. -- update complete: apply any saved dequeues. +Updater: +- when updatee joins set iterator i = q.end() +- while i != q.begin(): --i; send update_front(q,*i) to updatee +- send update_done(q) to updatee -Exchange updater: -- updater: send snapshot of exchange as it was at the sync point. +Updatee: +- q initially in locked state, can't dequeue locally. +- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) +- receive update_front(q,m): q.push_front(m) +- receive update_done(q): q can be unlocked for local dequeing. -Exchange updatee: -- queue exchange operations after the sync point. -- when snapshot is received: apply saved operations. +Benefits: +- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. +- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. +- During update consumers actually help by removing messages before they need to be updated. +- Needs no separate "work to do" queue, only the broker queues themselves. -Note: -- Updater is active throughout, no stalling. -- Consuming clients actually reduce the size of the update. -- Updatee stalls clients until the update completes. - (Note: May be possible to avoid updatee stall as well, needs thought) +# TODO how can we recover from updater crashing before update complete? +# Clear queues that are not updated & send request for udpates on those queues? -** Internal cluster interface +# TODO updatee may receive a dequeue for a message it has not yet seen, needs +# to hold on to that so it can drop the message when it is seen. +# Similar problem exists for wiring? -The new cluster interface is similar to the MessageStore interface, but -provides more detail (message positions) and some additional call -points (e.g. acquire) +** Cluster API + +The new cluster API is similar to the MessageStore interface. +(Initially I thought it would be an extension of the MessageStore interface, +but as the design develops it seems better to make it a separate interface.) The cluster interface captures these events: - wiring changes: queue/exchange declare/bind - message enqueued/acquired/released/rejected/dequeued. -- transactional events. + +The cluster will require some extensions to the Queue: +- Queues can be "locked", locked queues are ignored by IO-driven output. +- Cluster must be able to apply queue events from the cluster to a queue. + These appear to fit into existing queue operations. ** Maintainability @@ -270,48 +273,106 @@ A number of specific ways the code will be simplified: ** Performance -The standalone broker processes _connections_ concurrently, so CPU -usage increases as you add more connections. - -The new cluster processes _queues_ concurrently, so CPU usage increases as you -add more queues. - -In both cases, CPU usage peaks when the number of "units of - concurrency" (connections or queues) goes above the number of cores. - -When all consumers on a queue are connected to the same broker the new -cluster uses the same messagea allocation threading/logic as a -standalone broker, with a little extra asynchronous book-keeping. - -If a queue has multiple consumers connected to multiple brokers, the -new cluster time-shares the queue which is less efficient than having -all consumers on a queue connected to the same broker. +The only way to verify the relative performance of the new design is +to prototype & profile. The following points suggest the new design +may scale/perform better: + +Some work moved from virtual synchrony thread to connection threads: +- All connection/session logic moves to connection thread. +- Exchange routing logic moves to connection thread. +- On local broker dequeueing is done in connection thread +- Local broker dequeue is IO driven as for a standalone broker. + +For queues with all consumers on a single node dequeue is all +IO-driven in connection thread. Pay for time-sharing only if queue has +consumers on multiple brokers. + +Doing work for different queues in parallel scales on multi-core boxes when +there are multiple queues. + +One difference works against performance, thre is an extra +encode/decode. The old design multicasts raw client data and decodes +it in the virtual synchrony thread. The new design would decode +messages in the connection thread, re-encode them for multicast, and +decode (on non-local brokers) in the virtual synchrony thread. There +is extra work here, but only in the *connection* thread: on a +multi-core machine this happens in parallel for every connection, so +it probably is not a bottleneck. There may be scope to optimize +decode/re-encode by re-using some of the original encoded data, this +could also benefit the stand-alone broker. + +** Asynchronous queue replication + +The existing "asynchronous queue replication" feature maintains a +passive backup passive backup of queues on a remote broker over a TCP +connection. + +The new cluster replication protocol could be re-used to implement +asynchronous queue replication: its just a special case where the +active broker is always the queue owner and the enqueue/dequeue +messages are sent over a TCP connection rather than multicast. + +The new update update mechanism could also work with 'asynchronous +queue replication', allowing such replication (over a TCP connection +on a WAN say) to be initiated after the queue had already been created +and been in use (one of the key missing features). + +** Increasing Concurrency and load sharing + +The current cluster is bottlenecked by processing everything in the +CPG deliver thread. By removing the need for identical operation on +each broker, we open up the possiblility of greater concurrency. + +Handling multicast enqueue, acquire, accpet, release etc: concurrency +per queue. Operatons on different queues can be done in different +threads. + +The new design does not force each broker to do all the work in the +CPG thread so spreading load across cluster members should give some +scale-up. + +** Misc outstanding issues & notes + +Replicating wiring +- Need async completion of wiring commands? +- qpid.sequence_counter: need extra work to support in new design, do we care? + +Cluster+persistence: +- finish async completion: dequeue completion for store & cluster +- cluster restart from store: clean stores *not* identical, pick 1, all others update. +- need to generate cluster ids for messages recovered from store. + +Live updates: we don't need to stall brokers during an update! +- update on queue-by-queue basis. +- updatee locks queues during update, no dequeue. +- update in reverse: don't update messages dequeued during update. +- updatee adds update messages at front (as normal), replicated messages at back. +- updater starts from back, sends "update done" when it hits front of queue. + +Flow control: need to throttle multicasting +1. bound the number of outstanding multicasts. +2. ensure the entire cluster keeps up, no unbounded "lag" +The existing design uses read-credit to solve 1., and does not solve 2. +New design should stop reading on all connections while flow control +condition exists? + +Can federation also be unified, at least in configuration? + +Consider queues (and exchanges?) as having "reliability" attributes: +- persistent: is the message stored on disk. +- backed-up (to another broker): active/passive async replication. +- replicated (to a cluster): active/active multicast replication to cluster. +- federated: federation link to a queue/exchange on another broker. + +"Reliability" seems right for the first 3 but not for federation, is +there a better term? + +Clustering and scalability: new design may give us the flexibility to +address scalability as part of cluster design. Think about +relationship to federation and "fragmented queues" idea. + +* Design debates/descisions -** Flow control -New design does not queue up CPG delivered messages, they are -processed immediately in the CPG deliver thread. This means that CPG's -flow control is sufficient for qpid. - -** Live upgrades - -Live upgrades refers to the ability to upgrade a cluster while it is -running, with no downtime. Each brokers in the cluster is shut down, -and then re-started with a new version of the broker code. - -To achieve this -- Cluster protocl XML file has a new element <version number=N> attached - to each method. This is the version at which the method was added. -- New versions can only add methods, existing methods cannot be changed. -- The cluster handshake for new members includes the protocol version - at each member. -- The cluster's version is the lowest version among its members. -- A newer broker can join and older cluster. When it does, it must restrict - itself to speaking the older version protocol. -- When the cluster version increases (because the lowest version member has left) - the remaining members may move up to the new version. - - -* Design debates ** Active/active vs. active passive An active-active cluster can be used in an active-passive mode. In @@ -324,7 +385,7 @@ An active/passive implementation allows some simplifications over active/active: - can do immediate local enqueue and still guarantee order. Active/passive introduces a few extra requirements: -- Exactly one broker has to take over if primary fails. +- Exactly one broker hast to take over if primary fails. - Passive members must refuse client connections. - On failover, clients must re-try all known addresses till they find the active member. @@ -332,17 +393,43 @@ Active/active benefits: - A broker failure only affects the subset of clients connected to that broker. - Clients can switch to any other broker on failover - Backup brokers are immediately available on failover. -- As long as a client can connect to any broker in the cluster, it can be served. +- Some load sharing: reading from client + multicast only done on direct node. + +Active/active drawbacks: +- Co-ordinating message acquisition may impact performance (not tested) +- Code may be more complex that active/passive. Active/passive benefits: -- Don't need to replicate message allocation, can feed consumers at top speed. +- Don't need message allocation strategy, can feed consumers at top speed. +- Code may be simpler than active/active. Active/passive drawbacks: - All clients on one node so a failure affects every client in the system. - After a failure there is a "reconnect storm" as every client reconnects to the new active node. - After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. - Clients must find the single active node, may involve multiple connect attempts. -- No service if a partition separates a client from the active broker, - even if the client can see other brokers. +** Total ordering. + +Initial thinking: allow message ordering to differ between brokers. +New thinking: use CPG total ordering, get identical ordering on all brokers. +- Allowing variation in order introduces too much chance of unexpected behavior. +- Usign total order allows other optimizations, see Message Identifiers below. + +** Message identifiers. + +Initial thinking: message ID = CPG node id + 64 bit sequence number. +This involves a lot of mapping between cluster IDs and broker messsages. + +New thinking: message ID = queue name + queue position. +- Removes most of the mapping and memory management for cluster code. +- Requires total ordering of messages (see above) + +** Message rejection + +Initial thinking: add special reject/rejected points to cluster interface so +rejected messages could be re-queued without multicast. +New thinking: treat re-queueing after reject as entirely new message. +- Simplifies cluster interface & implementation +- Not on the critical path. diff --git a/cpp/design_docs/new-cluster-plan.txt b/cpp/design_docs/new-cluster-plan.txt index 32e3f710e7..781876e55a 100644 --- a/cpp/design_docs/new-cluster-plan.txt +++ b/cpp/design_docs/new-cluster-plan.txt @@ -17,150 +17,376 @@ # specific language governing permissions and limitations # under the License. -* Status of impementation -Meaning of priorities: -[#A] Essential for basic functioning. -[#B] Required for first release. -[#C] Can be addressed in a later release. +Notes on new cluster implementation. See also: new-cluster-design.txt -The existig prototype is bare bones to do performance benchmarks: -- Implements publish and consumer locking protocol. -- Defered delivery and asynchronous completion of message. -- Optimize the case all consumers are on the same node. -- No new member updates, no failover updates, no transactions, no persistence etc. +* Implementation plan. -Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/ +Co-existence with old cluster code and tests: +- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. +- Double up tests with old version/new version as the new code develops. -** Similarities to existing cluster. +Minimal POC for message delivery & perf test. +- no wiring replication, no updates, no failover, no persistence, no async completion. +- just implement publish and acquire/dequeue locking protocol. +- optimize the special case where all consumers are on the same node. +- measure performance: compare active-passive and active-active modes of use. -/Active-active/: the new cluster can be a drop-in replacement for the -old, existing tests & customer deployment configurations are still -valid. +Full implementation of transient cluster +- Update (based on existing update), async completion etc. +- Passing all existing transient cluster tests. -/Virtual synchrony/: Uses corosync to co-ordinate activity of members. +Persistent cluster +- Make sure async completion works correctly. +- InitialStatus protoocl etc. to support persistent start-up (existing code) +- cluster restart from store: stores not identical. Load one, update the rest. + - assign cluster ID's to messages recovered from store, don't replicate. -/XML controls/: Uses XML to define the primitives multicast to the -cluster. +Improved update protocol +- per-queue, less stalling, bounded catch-up. -** Differences with existing cluster. +* Task list -/Report rather than predict consumption/: brokers explicitly tell each -other which messages have been acquired or dequeued. This removes the -major cause of bugs in the existing cluster. +** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. -/Queue consumer locking/: to avoid duplicates only one broker can acquire or -dequeue messages at a time - while has the consume-lock on the -queue. If multiple brokers are consuming from the same queue the lock -is passed around to time-share access to the queue. +NOTE: as implementation questions arise, take the easiest option and make +a note for later optimization/improvement. -/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting -the concurrency of the host) to allow concurrent processing on -different queues. Queues are hashed onto the groups. +*** Tests +- python test: 4 senders, numbered messages, 4 receivers, verify message set. +- acquire then release messages: verify can be dequeued on any member +- acquire then kill broker: verify can be dequeued other members. +- acquire then reject: verify goes on alt-exchange once only. -* Completed tasks -** DONE [#A] Minimal POC: publish/acquire/dequeue protocol. - CLOSED: [2011-10-05 Wed 16:03] +*** DONE broker::Cluster interface and call points. -Defines broker::Cluster interface and call points. -Initial interface commite +Initial interface commited. -Main classes -Core: central object holding cluster classes together (replaces cluster::Cluster) -BrokerContext: implements broker::Cluster interface. -QueueContext: Attached to a broker::Queue, holds cluster status. -MessageHolder:holds local messages while they are being enqueued. +*** Main classes -Implements multiple CPG groups for better concurrency. +BrokerHandler: +- implements broker::Cluster intercept points. +- sends mcast events to inform cluster of local actions. +- thread safe, called in connection threads. -** DONE [#A] Large message replication. - CLOSED: [2011-10-05 Wed 17:22] -Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame) +LocalMessageMap: +- Holds local messages while they are being enqueued. +- thread safe: called by both BrokerHandler and MessageHandler + +MessageHandler: +- handles delivered mcast messages related to messages. +- initiates local actions in response to mcast events. +- thread unsafe, only called in deliver thread. +- maintains view of cluster state regarding messages. -* Open questions +QueueOwnerHandler: +- handles delivered mcast messages related to queue consumer ownership. +- thread safe, called in deliver, connection and timer threads. +- maintains view of cluster state regarding queue ownership. + +cluster::Core: class to hold new cluster together (replaces cluster::Cluster) +- thread safe: manage state used by both MessageHandler and BrokerHandler + +The following code sketch illustrates only the "happy path" error handling +is omitted. + +*** BrokerHandler +Types: +- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } +- struct + +NOTE: +- Messages on queues are identified by a queue name + a position. +- Messages being routed are identified by a sequence number. + +Members: +- thread_local bool noReplicate // suppress replication. +- thread_local bool isRouting // suppress operations while routing +- Message localMessage[SequenceNumber] // local messages being routed. +- thread_local SequenceNumber routingSequence + +NOTE: localMessage is also modified by MessageHandler. + +broker::Cluster intercept functions: + +routing(msg) + if noReplicate: return + # Supress everything except enqueues while we are routing. + # We don't want to replicate acquires & dequeues caused by an enqueu, + # e.g. removal of messages from ring/LV queues. + isRouting = true + +enqueue(qmsg): + if noReplicate: return + if routingSequence == 0 # thread local + routingSequence = nextRoutingSequence() + mcast create(encode(qmsg.msg),routingSeq) + mcast enqueue(qmsg.q,routingSeq) + +routed(msg): + if noReplicate: return + isRouting = false + +acquire(qmsg): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast acquire(qmsg) + +release(QueuedMessage) + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + mcast release(qmsg) + +accept(QueuedMessage): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + mcast accept(qmsg) + +reject(QueuedMessage): + isRejecting = true + mcast reject(qmsg) + +# FIXME no longer needed? +drop(QueuedMessage) + cleanup(qmsg) + +*** MessageHandler and mcast messages +Types: +- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } +- struct QueueKey { MessageId id; QueueName q; } +- typedef map<QueueKey, QueueEntry> Queue +- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; } + +Members: +- QueueEntry enqueued[QueueKey] +- Node node[NodeId] + +Mcast messages in Message class: + +create(msg,seq) + if sender != self: node[sender].routing[seq] = decode(msg) + +enqueue(q,seq): + id = (sender,seq) + if sender == self: + enqueued[id,q] = (localMessage[seq], acquired=None) + else: + msg = sender.routing[seq] + enqueued[id,q] = (qmsg, acquired=None) + with noReplicate=true: qmsg = broker.getQueue(q).push(msg) + +routed(seq): + if sender == self: localMessage.erase(msg.id.seq) + else: sender.routing.erase(seq) + +acquire(id,q): + enqueued[id,q].acquired = sender + node[sender].acquired.push_back((id,q)) + if sender != self: + with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) + +release(id,q) + enqueued[id,q].acquired = None + node[sender].acquired.erase((id,q)) + if sender != self + with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) + +reject(id,q): + sender.routing[id] = enqueued[id,q] # prepare for re-queueing + +rejected(id,q) + sender.routing.erase[id] + +dequeue(id,q) + entry = enqueued[id,q] + enqueued.erase[id,q] + node[entry.acquired].acquired.erase(id,q) + if sender != self: + with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) + +member m leaves cluster: + for key in node[m].acquired: + release(key.id, key.q) + node.erase(m) + +*** Queue consumer locking + +When a queue is locked it does not deliver messages to its consumers. + +New broker::Queue functions: +- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. +- startConsumers(): reset consumersStopped flag + +Implementation sketch, locking omitted: + +void Queue::stopConsumers() { + consumersStopped = true; + while (consumersBusy) consumersBusyMonitor.wait(); +} + +void Queue::startConsumers() { + consumersStopped = false; + listeners.notify(); +} + +bool Queue::dispatch(consumer) { + if (consumersStopped) return false; + ++consumersBusy; + do_regular_dispatch_body() + if (--consumersBusy == 0) consumersBusyMonitor.notify(); +} + +*** QueueOwnerHandler + +Invariants: +- Each queue is owned by at most one node at any time. +- Each node is interested in a set of queues at any given time. +- A queue is un-owned if no node is interested. + +The queue owner releases the queue when +- it loses interest i.e. queue has no consumers with credit. +- a configured time delay expires and there are other interested nodes. + +The owner mcasts release(q). On delivery the new queue owner is the +next node in node-id order (treating nodes as a circular list) +starting from the old owner that is interested in the queue. + +Queue consumers initially are stopped, only started when we get +ownership from the cluster. + +Thread safety: called by deliver, connection and timer threads, needs locking. + +Thread safe object per queue holding queue ownership status. +Called by deliver, connection and timer threads. + +class QueueOwnership { + bool owned; + Timer timer; + BrokerQueue q; + + drop(): # locked + if owned: + owned = false + q.stopConsumers() + mcast release(q.name, false) + timer.stop() + + take(): # locked + if not owned: + owned = true + q.startConsumers() + timer.start(timeout) + + timer.fire(): drop() +} + +Data Members, only modified/examined in deliver thread: +- typedef set<NodeId> ConsumerSet +- map<QueueName, ConsumerSet> consumers +- map<QueueName, NodeId> owner -** TODO [#A] Queue sequence numbers vs. independant message IDs. - SCHEDULED: <2011-10-07 Fri> +Thread safe data members, accessed in connection threads (via BrokerHandler): +- map<QueueName, QueueOwnership> ownership -Current prototype uses queue sequence numbers to identify -message. This is tricky for updating new members as the sequence -numbers are only known on delivery. +Multicast messages in QueueOwner class: -Independent message IDs that can be generated and sent with the message simplify -this and potentially allow performance benefits by relaxing total ordering. -However they imply additional map lookups that might hurt performance. +consume(q): + if sender==self and consumers[q].empty(): ownership[q].take() + consumers[q].insert(sender) -- [ ] Prototype independent message IDs, check performance. +release(q): + asssert(owner[q] == sender and owner[q] in consumers[q]) + owner[q] = circular search from sender in consumers[q] + if owner==self: ownership[q].take() -* Outstanding Tasks -** TODO [#A] Defer and async completion of wiring commands. +cancel(q): + assert(queue[q].owner != sender) # sender must release() before cancel() + consumers[q].erase(sender) -Testing requirement: Many tests assume wiring changes are visible -across the cluster once the commad completes. +member-leaves: + for q in queue: if owner[q] = left: left.release(q) -Name clashes: need to avoid race if same name queue/exchange declared -on 2 brokers simultaneously +Need 2 more intercept points in broker::Cluster: -** TODO [#A] Passing all existing cluster tests. +consume(q,consumer,consumerCount) - Queue::consume() + if consumerCount == 1: mcast consume(q) -The new cluster should be a drop-in replacement for the old, so it -should be able to pass all the existing tests. +cancel(q,consumer,consumerCount) - Queue::cancel() + if consumerCount == 0: + ownership[q].drop() + mcast cancel(q) -** TODO [#A] Update to new members joining. +#TODO: lifecycle, updating cluster data structures when queues are destroyed + +*** Increasing concurrency +The major performance limitation of the old cluster is that it does +everything in the single CPG deliver thread context. + +We can get additional concurrency by creating a thread context _per queue_ +for queue operations: enqueue, acquire, accept etc. + +We associate a PollableQueue of queue operations with each AMQP queue. +The CPG deliver thread would +- build messages and associate with cluster IDs. +- push queue ops to the appropriate PollableQueue to be dispatched the queues thread. + +Serializing operations on the same queue avoids contention, but takes advantage +of the independence of operations on separate queues. -Need to resolve [[Queue sequence numbers vs. independant message IDs]] first. -- implicit sequence numbers are more tricky to replicate to new member. +*** Re-use of existing cluster code +- re-use Event +- re-use Multicaster +- re-use same PollableQueueSetup (may experiment later) +- new Core class to replace Cluster. +- keep design modular, keep threading rules clear. -Update individual objects (queues and exchanges) independently. -- create queues first, then update all queues and exchanges in parallel. -- multiple updater threads, per queue/exchange. -- updater sends messages to special exchange(s) (not using extended AMQP controls) +** TODO [#B] Large message replication. +Multicast should encode messages in fixed size buffers (64k)? +Can't assume we can send message in one chunk. +For 0-10 can use channel numbers & send whole frames packed into larger buffer. +** TODO [#B] Transaction support. +Extend broker::Cluster interface to capture transaction context and completion. +Sequence number to generate per-node tx IDs. +Replicate transaction completion. +** TODO [#B] Batch CPG multicast messages +The new cluster design involves a lot of small multicast messages, +they need to be batched into larger CPG messages for efficiency. +** TODO [#B] Genuine async completion +Replace current synchronous waiting implementation with genuine async completion. -Queue updater: -- marks the queue position at the sync point -- sends messages starting from the sync point working towards the head of the queue. -- send "done" message. -Note: updater remains active throughout, consuming clients actually reduce the -size of the update. +Test: enhance test_store.cpp to defer enqueueComplete till special message received. -Queue updatee: -- enqueues received from CPG: add to back of queue as normal. -- dequeues received from CPG: apply if found, else save to check at end of update. -- messages from updater: add to the *front* of the queue. -- update complete: apply any saved dequeues. +Async callback uses *requestIOProcessing* to queue action on IO thread. -Exchange updater: -- updater: send snapshot of exchange as it was at the sync point. +** TODO [#B] Async completion of accept when dequeue completes. +Interface is already there on broker::Message, just need to ensure +that store and cluster implementations call it appropriately. -Exchange updatee: -- queue exchange operations after the sync point. -- when snapshot is received: apply saved operations. +** TODO [#B] Replicate wiring. +From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. -Updater remains active throughout. -Updatee stalls clients until the update completes. +** TODO [#B] New members joining - first pass -Updating queue/exchange/binding objects is via the same encode/decode -that is used by the store. Updatee to use recovery interfaces to -recover? +Re-use update code from old cluster but don't replicate sessions & +connections. -** TODO [#A] Failover updates to client. -Implement the amq.failover exchange to notify clients of membership. +Need to extend it to send cluster IDs with messages. -** TODO [#B] Initial status protocol. -Handshake to give status of each broker member to new members joining. -Status includes -- persistent store state (clean, dirty) -- cluster protocol version. +Need to replicate the queue ownership data as part of the update. -** TODO [#B] Persistent cluster support. -Initial status protoocl to support persistent start-up (see existing code) +** TODO [#B] Persistence support. +InitialStatus protoocl etc. to support persistent start-up (existing code) Only one broker recovers from store, update to others. Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. -** TODO [#B] Management support -Replicate management methods that modify queues - e.g. move, purge. +** TODO [#B] Handle other ways that messages can leave a queue. + +Other ways (other than via a consumer) that messages are take off a queue. + +NOTE: Not controlled by queue lock, how to make them consistent? + Target broker may not have all messages on other brokers for purge/destroy. - Queue::move() - need to wait for lock? Replicate? - Queue::get() - ??? @@ -169,38 +395,66 @@ Target broker may not have all messages on other brokers for purge/destroy. Need to add callpoints & mcast messages to replicate these? -** TODO [#B] TX transaction support. -Extend broker::Cluster interface to capture transaction context and completion. -Running brokers exchange TX information. -New broker update includes TX information. - - // FIXME aconway 2010-10-18: As things stand the cluster is not - // compatible with transactions - // - enqueues occur after routing is complete - // - no call to Cluster::enqueue, should be in Queue::process? - // - no transaction context associated with messages in the Cluster interface. - // - no call to Cluster::accept in Queue::dequeueCommitted - -** TODO [#B] DTX transaction support. -Extend broker::Cluster interface to capture transaction context and completion. -Running brokers exchange DTX information. -New broker update includes DTX information. - -** TODO [#B] Async completion of accept. -When this is fixed in the standalone broker, it should be fixed for cluster. - -** TODO [#B] Network partitions and quorum. -Re-use existing implementation. +** TODO [#B] Flow control for internal queues. + +Need to bound the size of internal queues: delivery and multicast. +- stop polling for read on client connections when we reach a bound. +- restart polling when we get back under it. + +That will stop local multicasting, we still have to deal with remote +multicasting (note existing cluster does not do this.) Something like: +- when over bounds multicast a flow-control event. +- on delivery of flow-control all members stop polling to read client connections +- when back under bounds send flow-control-end, all members resume +- if flow-controling member dies others resume + +** TODO [#B] Integration with transactions. +Do we want to replicate during transaction & replicate commit/rollback +or replicate only on commit? +No integration with DTX transactions. +** TODO [#B] Make new cluster work with replication exchange. +Possibly re-use some common logic. Replication exchange is like clustering +except over TCP. +** TODO [#B] Better concurrency, scalabiility on multi-cores. +Introduce PollableQueue of operations per broker queue. Queue up mcast +operations (enqueue, acquire, accept etc.) to be handled concurrently +on different queue. Performance testing to verify improved scalability. +** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. +Cluster needs to complete these asynchronously to guarantee resources +exist across the cluster when the command completes. ** TODO [#C] Allow non-replicated exchanges, queues. -Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects. +Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. - save replicated status to store. - support in management tools. +Replicated exchange: replicate binds to replicated queues. Replicated queue: replicate all messages. -Replicated exchange: replicate bindings to replicated queues only. -Configurable default? Defaults to true. +** TODO [#C] New members joining - improved. + +Replicate wiring like old cluster, stall for wiring but not for +messages. Update messages on a per-queue basis from back to front. + +Updater: +- stall & push wiring: declare exchanges, queues, bindings. +- start update iterator thread on each queue. +- unstall and process normally while iterator threads run. + +Update iterator thread: +- starts at back of updater queue, message m. +- send update_front(q,m) to updatee and advance towards front +- at front: send update_done(q) + +Updatee: +- stall, receive wiring, lock all queues, mark queues "updating", unstall +- update_front(q,m): push m to *front* of q +- update_done(q): mark queue "ready" + +Updatee cannot take the queue consume lock for a queue that is updating. +Updatee *can* push messages onto a queue that is updating. + +TODO: Is there any way to eliminate the stall for wiring? ** TODO [#C] Refactoring of common concerns. @@ -215,40 +469,9 @@ Look for ways to capitalize on the similarity & simplify the code. In particular QueuedEvents (async replication) strongly resembles cluster replication, but over TCP rather than multicast. - +** TODO [#C] Concurrency for enqueue events. +All enqueue events are being processed in the CPG deliver thread context which +serializes all the work. We only need ordering on a per queue basis, can we +enqueue in parallel on different queues and will that improve performance? ** TODO [#C] Handling immediate messages in a cluster Include remote consumers in descision to deliver an immediate message? -** TODO [#C] Remove old cluster hacks and workarounds -The old cluster has workarounds in the broker code that can be removed. -- [ ] drop code to replicate management model. -- [ ] drop timer workarounds for TTL, management, heartbeats. -- [ ] drop "cluster-safe assertions" in broker code. -- [ ] drop connections, sessions, management from cluster update. -- [ ] drop security workarounds: cluster code now operates after message decoding. -- [ ] drop connection tracking in cluster code. -- [ ] simper inconsistent-error handling code, no need to stall. -** TODO [#C] Support for live upgrades. - -Allow brokers in a running cluster to be replaced one-by-one with a new version. - -The old cluster protocol was unstable because any changes in broker -state caused changes to the cluster protocol.The new design should be -much more stable. - -Points to implement: -- Brokers should ignore unknown controls (with a warning) rather than an error. -- Limit logging frequency for unknown control warnings. -- Add a version number at front of every CPG message. Determines how the - rest of the message is decoded. (allows for entirely new encodings e.g. AMQP 1.0) -- Protocol version XML element in cluster.xml, on each control. -- Initial status protocol to include protocol version number. - -** TODO [#C] Support for AMQP 1.0. - -* Testing -** TODO [#A] Pass all existing cluster tests. -Requires [[Defer and async completion of wiring commands.]] -** TODO [#A] New cluster tests. -Stress tests & performance benchmarks focused on changes in new cluster: -- concurrency by queues rather than connections. -- different handling shared queues when consuemrs are on different brokers. |
