summaryrefslogtreecommitdiff
path: root/cpp/design_docs/new-cluster-design.txt
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/design_docs/new-cluster-design.txt')
-rw-r--r--cpp/design_docs/new-cluster-design.txt285
1 files changed, 186 insertions, 99 deletions
diff --git a/cpp/design_docs/new-cluster-design.txt b/cpp/design_docs/new-cluster-design.txt
index a162ea68ec..7adb46fee3 100644
--- a/cpp/design_docs/new-cluster-design.txt
+++ b/cpp/design_docs/new-cluster-design.txt
@@ -17,6 +17,7 @@
# under the License.
* A new design for Qpid clustering.
+
** Issues with current design.
The cluster is based on virtual synchrony: each broker multicasts
@@ -94,9 +95,8 @@ Use a moving queue ownership protocol to agree order of dequeues.
No longer relies on identical state and lock-step behavior to cause
identical dequeues on each broker.
-Use multiple CPG groups to process different queues in parallel. Use a
-fixed set of groups and hash queue names to choose the group for each
-queue.
+Each queue has an associated thread-context. Events for a queue are executed
+in that queues context, in parallel with events for other queues.
*** Requirements
@@ -149,7 +149,7 @@ a release-queue event, allowing another interested broker to take
ownership.
*** Asynchronous completion of accept
-
+### HERE
In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.
@@ -162,32 +162,19 @@ On receiving an accept the broker:
NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.
-*** Multiple CPG groups.
-
-The old cluster was bottlenecked by processing everything in a single
-CPG deliver thread.
-
-The new cluster uses a set of CPG groups, one per core. Queue names
-are hashed to give group indexes, so statistically queues are likely
-to be spread over the set of groups.
-
-Operations on a given queue always use the same group, so we have
-order within each queue, but operations on different queues can use
-different groups giving greater throughput sending to CPG and multiple
-handler threads to process CPG messages.
-
** Inconsistent errors.
-An inconsistent error means that after multicasting an enqueue, accept
-or dequeue, some brokers succeed in processing it and others fail.
+The new design eliminates most sources of inconsistent errors
+(connections, sessions, security, management etc.) The only points
+where inconsistent errors can occur are at enqueue and dequeue (most
+likely store-related errors.)
-The new design eliminates most sources of inconsistent errors in the
-old broker: connections, sessions, security, management etc. Only
-store journal errors remain.
+The new design can use the exisiting error-handling protocol with one
+major improvement: since brokers are no longer required to maintain
+identical state they do not have to stall processing while an error is
+being resolved.
-The new inconsistent error protocol is similar to the old one with one
-major improvement: brokers do not have to stall processing while an
-error is being resolved.
+#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.
** Updating new members
@@ -206,44 +193,60 @@ catch up (which is not guaranteed to happen in a bounded time.)
With the new cluster design only exchanges, queues, bindings and
messages need to be replicated.
-We update individual objects (queues and exchanges) independently.
-- create queues first, then update all queues and exchanges in parallel.
-- multiple updater threads, per queue/exchange.
+Update of wiring (exchanges, queues, bindings) is the same as current
+design.
+
+Update of messages is different:
+- per-queue rather than per-broker, separate queues can be updated in parallel.
+- updates queues in reverse order to eliminate unbounded catch-up
+- does not require updater & updatee to stall during update.
-Queue updater:
-- marks the queue position at the sync point
-- sends messages starting from the sync point working towards the head of the queue.
-- send "done" message.
+Replication events, multicast to cluster:
+- enqueue(q,m): message m pushed on back of queue q .
+- acquire(q,m): mark m acquired
+- dequeue(q,m): forget m.
+Messages sent on update connection:
+- update_front(q,m): during update, receiver pushes m to *front* of q
+- update_done(q): during update, update of q is complete.
-Queue updatee:
-- enqueues received from CPG: add to back of queue as normal.
-- dequeues received from CPG: apply if found, else save to check at end of update.
-- messages from updater: add to the *front* of the queue.
-- update complete: apply any saved dequeues.
+Updater:
+- when updatee joins set iterator i = q.end()
+- while i != q.begin(): --i; send update_front(q,*i) to updatee
+- send update_done(q) to updatee
-Exchange updater:
-- updater: send snapshot of exchange as it was at the sync point.
+Updatee:
+- q initially in locked state, can't dequeue locally.
+- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
+- receive update_front(q,m): q.push_front(m)
+- receive update_done(q): q can be unlocked for local dequeing.
-Exchange updatee:
-- queue exchange operations after the sync point.
-- when snapshot is received: apply saved operations.
+Benefits:
+- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
+- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
+- During update consumers actually help by removing messages before they need to be updated.
+- Needs no separate "work to do" queue, only the broker queues themselves.
-Note:
-- Updater is active throughout, no stalling.
-- Consuming clients actually reduce the size of the update.
-- Updatee stalls clients until the update completes.
- (Note: May be possible to avoid updatee stall as well, needs thought)
+# TODO how can we recover from updater crashing before update complete?
+# Clear queues that are not updated & send request for udpates on those queues?
-** Internal cluster interface
+# TODO updatee may receive a dequeue for a message it has not yet seen, needs
+# to hold on to that so it can drop the message when it is seen.
+# Similar problem exists for wiring?
-The new cluster interface is similar to the MessageStore interface, but
-provides more detail (message positions) and some additional call
-points (e.g. acquire)
+** Cluster API
+
+The new cluster API is similar to the MessageStore interface.
+(Initially I thought it would be an extension of the MessageStore interface,
+but as the design develops it seems better to make it a separate interface.)
The cluster interface captures these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/acquired/released/rejected/dequeued.
-- transactional events.
+
+The cluster will require some extensions to the Queue:
+- Queues can be "locked", locked queues are ignored by IO-driven output.
+- Cluster must be able to apply queue events from the cluster to a queue.
+ These appear to fit into existing queue operations.
** Maintainability
@@ -270,48 +273,106 @@ A number of specific ways the code will be simplified:
** Performance
-The standalone broker processes _connections_ concurrently, so CPU
-usage increases as you add more connections.
-
-The new cluster processes _queues_ concurrently, so CPU usage increases as you
-add more queues.
-
-In both cases, CPU usage peaks when the number of "units of
- concurrency" (connections or queues) goes above the number of cores.
-
-When all consumers on a queue are connected to the same broker the new
-cluster uses the same messagea allocation threading/logic as a
-standalone broker, with a little extra asynchronous book-keeping.
-
-If a queue has multiple consumers connected to multiple brokers, the
-new cluster time-shares the queue which is less efficient than having
-all consumers on a queue connected to the same broker.
+The only way to verify the relative performance of the new design is
+to prototype & profile. The following points suggest the new design
+may scale/perform better:
+
+Some work moved from virtual synchrony thread to connection threads:
+- All connection/session logic moves to connection thread.
+- Exchange routing logic moves to connection thread.
+- On local broker dequeueing is done in connection thread
+- Local broker dequeue is IO driven as for a standalone broker.
+
+For queues with all consumers on a single node dequeue is all
+IO-driven in connection thread. Pay for time-sharing only if queue has
+consumers on multiple brokers.
+
+Doing work for different queues in parallel scales on multi-core boxes when
+there are multiple queues.
+
+One difference works against performance, thre is an extra
+encode/decode. The old design multicasts raw client data and decodes
+it in the virtual synchrony thread. The new design would decode
+messages in the connection thread, re-encode them for multicast, and
+decode (on non-local brokers) in the virtual synchrony thread. There
+is extra work here, but only in the *connection* thread: on a
+multi-core machine this happens in parallel for every connection, so
+it probably is not a bottleneck. There may be scope to optimize
+decode/re-encode by re-using some of the original encoded data, this
+could also benefit the stand-alone broker.
+
+** Asynchronous queue replication
+
+The existing "asynchronous queue replication" feature maintains a
+passive backup passive backup of queues on a remote broker over a TCP
+connection.
+
+The new cluster replication protocol could be re-used to implement
+asynchronous queue replication: its just a special case where the
+active broker is always the queue owner and the enqueue/dequeue
+messages are sent over a TCP connection rather than multicast.
+
+The new update update mechanism could also work with 'asynchronous
+queue replication', allowing such replication (over a TCP connection
+on a WAN say) to be initiated after the queue had already been created
+and been in use (one of the key missing features).
+
+** Increasing Concurrency and load sharing
+
+The current cluster is bottlenecked by processing everything in the
+CPG deliver thread. By removing the need for identical operation on
+each broker, we open up the possiblility of greater concurrency.
+
+Handling multicast enqueue, acquire, accpet, release etc: concurrency
+per queue. Operatons on different queues can be done in different
+threads.
+
+The new design does not force each broker to do all the work in the
+CPG thread so spreading load across cluster members should give some
+scale-up.
+
+** Misc outstanding issues & notes
+
+Replicating wiring
+- Need async completion of wiring commands?
+- qpid.sequence_counter: need extra work to support in new design, do we care?
+
+Cluster+persistence:
+- finish async completion: dequeue completion for store & cluster
+- cluster restart from store: clean stores *not* identical, pick 1, all others update.
+- need to generate cluster ids for messages recovered from store.
+
+Live updates: we don't need to stall brokers during an update!
+- update on queue-by-queue basis.
+- updatee locks queues during update, no dequeue.
+- update in reverse: don't update messages dequeued during update.
+- updatee adds update messages at front (as normal), replicated messages at back.
+- updater starts from back, sends "update done" when it hits front of queue.
+
+Flow control: need to throttle multicasting
+1. bound the number of outstanding multicasts.
+2. ensure the entire cluster keeps up, no unbounded "lag"
+The existing design uses read-credit to solve 1., and does not solve 2.
+New design should stop reading on all connections while flow control
+condition exists?
+
+Can federation also be unified, at least in configuration?
+
+Consider queues (and exchanges?) as having "reliability" attributes:
+- persistent: is the message stored on disk.
+- backed-up (to another broker): active/passive async replication.
+- replicated (to a cluster): active/active multicast replication to cluster.
+- federated: federation link to a queue/exchange on another broker.
+
+"Reliability" seems right for the first 3 but not for federation, is
+there a better term?
+
+Clustering and scalability: new design may give us the flexibility to
+address scalability as part of cluster design. Think about
+relationship to federation and "fragmented queues" idea.
+
+* Design debates/descisions
-** Flow control
-New design does not queue up CPG delivered messages, they are
-processed immediately in the CPG deliver thread. This means that CPG's
-flow control is sufficient for qpid.
-
-** Live upgrades
-
-Live upgrades refers to the ability to upgrade a cluster while it is
-running, with no downtime. Each brokers in the cluster is shut down,
-and then re-started with a new version of the broker code.
-
-To achieve this
-- Cluster protocl XML file has a new element <version number=N> attached
- to each method. This is the version at which the method was added.
-- New versions can only add methods, existing methods cannot be changed.
-- The cluster handshake for new members includes the protocol version
- at each member.
-- The cluster's version is the lowest version among its members.
-- A newer broker can join and older cluster. When it does, it must restrict
- itself to speaking the older version protocol.
-- When the cluster version increases (because the lowest version member has left)
- the remaining members may move up to the new version.
-
-
-* Design debates
** Active/active vs. active passive
An active-active cluster can be used in an active-passive mode. In
@@ -324,7 +385,7 @@ An active/passive implementation allows some simplifications over active/active:
- can do immediate local enqueue and still guarantee order.
Active/passive introduces a few extra requirements:
-- Exactly one broker has to take over if primary fails.
+- Exactly one broker hast to take over if primary fails.
- Passive members must refuse client connections.
- On failover, clients must re-try all known addresses till they find the active member.
@@ -332,17 +393,43 @@ Active/active benefits:
- A broker failure only affects the subset of clients connected to that broker.
- Clients can switch to any other broker on failover
- Backup brokers are immediately available on failover.
-- As long as a client can connect to any broker in the cluster, it can be served.
+- Some load sharing: reading from client + multicast only done on direct node.
+
+Active/active drawbacks:
+- Co-ordinating message acquisition may impact performance (not tested)
+- Code may be more complex that active/passive.
Active/passive benefits:
-- Don't need to replicate message allocation, can feed consumers at top speed.
+- Don't need message allocation strategy, can feed consumers at top speed.
+- Code may be simpler than active/active.
Active/passive drawbacks:
- All clients on one node so a failure affects every client in the system.
- After a failure there is a "reconnect storm" as every client reconnects to the new active node.
- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary.
- Clients must find the single active node, may involve multiple connect attempts.
-- No service if a partition separates a client from the active broker,
- even if the client can see other brokers.
+** Total ordering.
+
+Initial thinking: allow message ordering to differ between brokers.
+New thinking: use CPG total ordering, get identical ordering on all brokers.
+- Allowing variation in order introduces too much chance of unexpected behavior.
+- Usign total order allows other optimizations, see Message Identifiers below.
+
+** Message identifiers.
+
+Initial thinking: message ID = CPG node id + 64 bit sequence number.
+This involves a lot of mapping between cluster IDs and broker messsages.
+
+New thinking: message ID = queue name + queue position.
+- Removes most of the mapping and memory management for cluster code.
+- Requires total ordering of messages (see above)
+
+** Message rejection
+
+Initial thinking: add special reject/rejected points to cluster interface so
+rejected messages could be re-queued without multicast.
+New thinking: treat re-queueing after reject as entirely new message.
+- Simplifies cluster interface & implementation
+- Not on the critical path.