summaryrefslogtreecommitdiff
path: root/cpp/design_docs
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/design_docs')
-rw-r--r--cpp/design_docs/hot-standby-design.txt239
-rw-r--r--cpp/design_docs/new-cluster-design.txt285
-rw-r--r--cpp/design_docs/new-cluster-plan.txt545
3 files changed, 570 insertions, 499 deletions
diff --git a/cpp/design_docs/hot-standby-design.txt b/cpp/design_docs/hot-standby-design.txt
deleted file mode 100644
index 99a5dc0199..0000000000
--- a/cpp/design_docs/hot-standby-design.txt
+++ /dev/null
@@ -1,239 +0,0 @@
--*-org-*-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-* Another new design for Qpid clustering.
-
-For background see [[./new-cluster-design.txt]] which describes the issues
-with the old design and a new active-active design that could replace it.
-
-This document describes an alternative hot-standby approach.
-
-** Delivery guarantee
-
-We guarantee N-way redundant, at least once delivey. Once a message
-from a client has been acknowledged by the broker, it will be
-delivered even if N-1 brokers subsequently fail. There may be
-duplicates in the event of a failure. We don't make duplicates
-during normal operation (i.e when no brokers have failed)
-
-This is the same guarantee as the old cluster and the alternative
-active-active design.
-
-** Active-active vs. hot standby (aka primary-backup)
-
-An active-active cluster allows clients to connect to any broker in
-the cluster. If a broker fails, clients can fail-over to any other
-live broker.
-
-A hot-standby cluster has only one active broker at a time (the
-"primary") and one or more brokers on standby (the "backups"). Clients
-are only served by the leader, clients that connect to a backup are
-redirected to the leader. The backpus are kept up-to-date in real time
-by the primary, if the primary fails a backup is elected to be the new
-primary.
-
-Aside: A cold-standby cluster is possible using a standalone broker,
-CMAN and shared storage. In this scenario only one broker runs at a
-time writing to a shared store. If it fails, another broker is started
-(by CMAN) and recovers from the store. This bears investigation but
-the store recovery time is probably too long for failover.
-
-** Why hot standby?
-
-Active-active has some advantages:
-- Finding a broker on startup or failover is simple, just pick any live broker.
-- All brokers are always running in active mode, there's no
-- Distributing clients across brokers gives better performance, but see [1].
-- A broker failure affects only clients connected to that broker.
-
-The main problem with active-active is co-ordinating consumers of the
-same queue on multiple brokers such that there are no duplicates in
-normal operation. There are 2 approaches:
-
-Predictive: each broker predicts which messages others will take. This
-the main weakness of the old design so not appealing.
-
-Locking: brokers "lock" a queue in order to take messages. This is
-complex to implement, its not straighforward to determine the most
-performant strategie for passing the lock.
-
-Hot-standby removes this problem. Only the primary can modify queues
-so it just has to tell the backups what it is doing, there's no
-locking.
-
-The primary can enqueue messages and replicate asynchronously -
-exactly like the store does, but it "writes" to the replicas over the
-network rather than writing to disk.
-
-** Failover in a hot-standby cluster.
-
-Hot-standby has some potential performance issues around failover:
-
-- Failover "spike": when the primary fails every client will fail over
- at the same time, putting strain on the system.
-
-- Until a new primary is elected, cluster cannot serve any clients or
- redirect clients to the primary.
-
-We want to minimize the number of re-connect attempts that clients
-have to make. The cluster can use a well-known algorithm to choose the
-new primary (e.g. round robin on a known sequence of brokers) so that
-clients can guess the new primary correctly in most cases.
-
-Even if clients do guess correctly it may be that the new primary is
-not yet aware of the death of the old primary, which is may to cause
-multiple failed connect attempts before clients eventually get
-connected. We will need to prototype to see how much this happens in
-reality and how we can best get clients redirected.
-
-** Threading and performance.
-
-The primary-backup cluster operates analogously to the way the disk store does now:
-- use the same MessageStore interface as the store to interact with the broker
-- use the same asynchronous-completion model for replicating messages.
-- use the same recovery interfaces (?) for new backups joining.
-
-Re-using the well-established store design gives credibility to the new cluster design.
-
-The single CPG dispatch thread was a severe performance bottleneck for the old cluster.
-
-The primary has the same threading model as a a standalone broker with
-a store, which we know that this performs well.
-
-If we use CPG for replication of messages, the backups will receive
-messages in the CPG dispatch thread. To get more concurency, the CPG
-thread can dump work onto internal PollableQueues to be processed in
-parallel.
-
-Messages from the same broker queue need to go onto the same
-PollableQueue. There could be a separate PollableQueue for each broker
-queue. If that's too resource intensive we can use a fixed set of
-PollableQueues and assign broker queues to PollableQueues via hashing
-or round robin.
-
-Another possible optimization is to use multiple CPG queues: one per
-queue or a hashed set, to get more concurrency in the CPG layer. The
-old cluster is not able to keep CPG busy.
-
-TODO: Transactions pose a challenge with these concurrent models: how
-to co-ordinate multiple messages being added (commit a publish or roll
-back an accept) to multiple queues so that all replicas end up with
-the same message sequence while respecting atomicity.
-
-** Use of CPG
-
-CPG provides several benefits in the old cluster:
-- tracking membership (essential for determining the primary)
-- handling "spit brain" (integrates with partition support from CMAN)
-- reliable multicast protocol to distribute messages.
-
-I believe we still need CPG for membership and split brain. We could
-experiment with sending the bulk traffic over AMQP conections.
-
-** Flow control
-
-Need to ensure that
-1) In-memory internal queues used by the cluster don't overflow.
-2) The backups don't fall too far behind on processing CPG messages
-
-** Recovery
-When a new backup joins an active cluster it must get a snapshot
-from one of the other backups, or the primary if there are none. In
-store terms this is "recovery" (old cluster called it an "update)
-
-Compared to old cluster we only replidate well defined data set of the store.
-This is the crucial sore spot of old cluster.
-
-We can also replicated it more efficiently by recovering queues in
-reverse (LIFO) order. That means as clients actively consume messages
-from the front of the queue, they are redeucing the work we have to do
-in recovering from the back. (NOTE: this may not be compatible with
-using the same recovery interfaces as the store.)
-
-** Selective replication
-In this model it's easy to support selective replication of individual queues via
-configuration.
-- Explicit exchange/queue declare argument and message boolean: x-qpid-replicate.
- Treated analogously to persistent/durable properties for the store.
-- if not explicitly marked, provide a choice of default
- - default is replicate (replicated message on replicated queue)
- - default is don't replicate
- - default is replicate persistent/durable messages.
-
-** Inconsistent errors
-
-The new design eliminates most sources of inconsistent errors in the
-old design (connections, sessions, security, management etc.) and
-eliminates the need to stall the whole cluster till an error is
-resolved. We still have to handle inconsistent store errors when store
-and cluster are used together.
-
-We also have to include error handling in the async completion loop to
-guarantee N-way at least once: we should only report success to the
-client when we know the message was replicated and stored on all N-1
-backups.
-
-TODO: We have a lot more options than the old cluster, need to figure
-out the best approach, or possibly allow mutliple approaches. Need to
-go thru the various failure cases. We may be able to do recovery on a
-per-queue basis rather than restarting an entire node.
-
-** New members joining
-
-We should be able to catch up much faster than the the old design. A
-new backup can catch up ("recover") the current cluster state on a
-per-queue basis.
-- queues can be updated in parallel
-- "live" updates avoid the the "endless chase"
-
-During a "live" update several things are happening on a queue:
-- clients are publishing messages to the back of the queue, replicated to the backup
-- clients are consuming messages from the front of the queue, replicated to the backup.
-- the primary is sending pre-existing messages to the new backup.
-
-The primary sends pre-existing messages in LIFO order - starting from
-the back of the queue, at the same time clients are consuming from the front.
-The active consumers actually reduce the amount of work to be done, as there's
-no need to replicate messages that are no longer on the queue.
-
-* Steps to get there
-
-** Baseline replication
-Validate the overall design get initial notion of performance. Just
-message+wiring replication, no update/recovery for new members joining,
-single CPG dispatch thread on backups, no failover, no transactions.
-
-** Failover
-Electing primary, backups redirect to primary. Measure failover time
-for large # clients. Strategies to minimise number of retries after a
-failure.
-
-** Flow Control
-Keep internal queues from over-flowing. Similar to internal flow control in old cluster.
-Needed for realistic performance/stress tests
-
-** Concurrency
-Experiment with multiple threads on backups, multiple CPG groups.
-
-** Recovery/new member joining
-Initial status handshake for new member. Recovering queues from the back.
-
-** Transactions
-TODO: How to implement transactions with concurrency. Worst solution:
-a global --cluster-use-transactions flag that forces single thread
-mode. Need to find a better solution.
diff --git a/cpp/design_docs/new-cluster-design.txt b/cpp/design_docs/new-cluster-design.txt
index a162ea68ec..7adb46fee3 100644
--- a/cpp/design_docs/new-cluster-design.txt
+++ b/cpp/design_docs/new-cluster-design.txt
@@ -17,6 +17,7 @@
# under the License.
* A new design for Qpid clustering.
+
** Issues with current design.
The cluster is based on virtual synchrony: each broker multicasts
@@ -94,9 +95,8 @@ Use a moving queue ownership protocol to agree order of dequeues.
No longer relies on identical state and lock-step behavior to cause
identical dequeues on each broker.
-Use multiple CPG groups to process different queues in parallel. Use a
-fixed set of groups and hash queue names to choose the group for each
-queue.
+Each queue has an associated thread-context. Events for a queue are executed
+in that queues context, in parallel with events for other queues.
*** Requirements
@@ -149,7 +149,7 @@ a release-queue event, allowing another interested broker to take
ownership.
*** Asynchronous completion of accept
-
+### HERE
In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.
@@ -162,32 +162,19 @@ On receiving an accept the broker:
NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.
-*** Multiple CPG groups.
-
-The old cluster was bottlenecked by processing everything in a single
-CPG deliver thread.
-
-The new cluster uses a set of CPG groups, one per core. Queue names
-are hashed to give group indexes, so statistically queues are likely
-to be spread over the set of groups.
-
-Operations on a given queue always use the same group, so we have
-order within each queue, but operations on different queues can use
-different groups giving greater throughput sending to CPG and multiple
-handler threads to process CPG messages.
-
** Inconsistent errors.
-An inconsistent error means that after multicasting an enqueue, accept
-or dequeue, some brokers succeed in processing it and others fail.
+The new design eliminates most sources of inconsistent errors
+(connections, sessions, security, management etc.) The only points
+where inconsistent errors can occur are at enqueue and dequeue (most
+likely store-related errors.)
-The new design eliminates most sources of inconsistent errors in the
-old broker: connections, sessions, security, management etc. Only
-store journal errors remain.
+The new design can use the exisiting error-handling protocol with one
+major improvement: since brokers are no longer required to maintain
+identical state they do not have to stall processing while an error is
+being resolved.
-The new inconsistent error protocol is similar to the old one with one
-major improvement: brokers do not have to stall processing while an
-error is being resolved.
+#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.
** Updating new members
@@ -206,44 +193,60 @@ catch up (which is not guaranteed to happen in a bounded time.)
With the new cluster design only exchanges, queues, bindings and
messages need to be replicated.
-We update individual objects (queues and exchanges) independently.
-- create queues first, then update all queues and exchanges in parallel.
-- multiple updater threads, per queue/exchange.
+Update of wiring (exchanges, queues, bindings) is the same as current
+design.
+
+Update of messages is different:
+- per-queue rather than per-broker, separate queues can be updated in parallel.
+- updates queues in reverse order to eliminate unbounded catch-up
+- does not require updater & updatee to stall during update.
-Queue updater:
-- marks the queue position at the sync point
-- sends messages starting from the sync point working towards the head of the queue.
-- send "done" message.
+Replication events, multicast to cluster:
+- enqueue(q,m): message m pushed on back of queue q .
+- acquire(q,m): mark m acquired
+- dequeue(q,m): forget m.
+Messages sent on update connection:
+- update_front(q,m): during update, receiver pushes m to *front* of q
+- update_done(q): during update, update of q is complete.
-Queue updatee:
-- enqueues received from CPG: add to back of queue as normal.
-- dequeues received from CPG: apply if found, else save to check at end of update.
-- messages from updater: add to the *front* of the queue.
-- update complete: apply any saved dequeues.
+Updater:
+- when updatee joins set iterator i = q.end()
+- while i != q.begin(): --i; send update_front(q,*i) to updatee
+- send update_done(q) to updatee
-Exchange updater:
-- updater: send snapshot of exchange as it was at the sync point.
+Updatee:
+- q initially in locked state, can't dequeue locally.
+- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
+- receive update_front(q,m): q.push_front(m)
+- receive update_done(q): q can be unlocked for local dequeing.
-Exchange updatee:
-- queue exchange operations after the sync point.
-- when snapshot is received: apply saved operations.
+Benefits:
+- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
+- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
+- During update consumers actually help by removing messages before they need to be updated.
+- Needs no separate "work to do" queue, only the broker queues themselves.
-Note:
-- Updater is active throughout, no stalling.
-- Consuming clients actually reduce the size of the update.
-- Updatee stalls clients until the update completes.
- (Note: May be possible to avoid updatee stall as well, needs thought)
+# TODO how can we recover from updater crashing before update complete?
+# Clear queues that are not updated & send request for udpates on those queues?
-** Internal cluster interface
+# TODO updatee may receive a dequeue for a message it has not yet seen, needs
+# to hold on to that so it can drop the message when it is seen.
+# Similar problem exists for wiring?
-The new cluster interface is similar to the MessageStore interface, but
-provides more detail (message positions) and some additional call
-points (e.g. acquire)
+** Cluster API
+
+The new cluster API is similar to the MessageStore interface.
+(Initially I thought it would be an extension of the MessageStore interface,
+but as the design develops it seems better to make it a separate interface.)
The cluster interface captures these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/acquired/released/rejected/dequeued.
-- transactional events.
+
+The cluster will require some extensions to the Queue:
+- Queues can be "locked", locked queues are ignored by IO-driven output.
+- Cluster must be able to apply queue events from the cluster to a queue.
+ These appear to fit into existing queue operations.
** Maintainability
@@ -270,48 +273,106 @@ A number of specific ways the code will be simplified:
** Performance
-The standalone broker processes _connections_ concurrently, so CPU
-usage increases as you add more connections.
-
-The new cluster processes _queues_ concurrently, so CPU usage increases as you
-add more queues.
-
-In both cases, CPU usage peaks when the number of "units of
- concurrency" (connections or queues) goes above the number of cores.
-
-When all consumers on a queue are connected to the same broker the new
-cluster uses the same messagea allocation threading/logic as a
-standalone broker, with a little extra asynchronous book-keeping.
-
-If a queue has multiple consumers connected to multiple brokers, the
-new cluster time-shares the queue which is less efficient than having
-all consumers on a queue connected to the same broker.
+The only way to verify the relative performance of the new design is
+to prototype & profile. The following points suggest the new design
+may scale/perform better:
+
+Some work moved from virtual synchrony thread to connection threads:
+- All connection/session logic moves to connection thread.
+- Exchange routing logic moves to connection thread.
+- On local broker dequeueing is done in connection thread
+- Local broker dequeue is IO driven as for a standalone broker.
+
+For queues with all consumers on a single node dequeue is all
+IO-driven in connection thread. Pay for time-sharing only if queue has
+consumers on multiple brokers.
+
+Doing work for different queues in parallel scales on multi-core boxes when
+there are multiple queues.
+
+One difference works against performance, thre is an extra
+encode/decode. The old design multicasts raw client data and decodes
+it in the virtual synchrony thread. The new design would decode
+messages in the connection thread, re-encode them for multicast, and
+decode (on non-local brokers) in the virtual synchrony thread. There
+is extra work here, but only in the *connection* thread: on a
+multi-core machine this happens in parallel for every connection, so
+it probably is not a bottleneck. There may be scope to optimize
+decode/re-encode by re-using some of the original encoded data, this
+could also benefit the stand-alone broker.
+
+** Asynchronous queue replication
+
+The existing "asynchronous queue replication" feature maintains a
+passive backup passive backup of queues on a remote broker over a TCP
+connection.
+
+The new cluster replication protocol could be re-used to implement
+asynchronous queue replication: its just a special case where the
+active broker is always the queue owner and the enqueue/dequeue
+messages are sent over a TCP connection rather than multicast.
+
+The new update update mechanism could also work with 'asynchronous
+queue replication', allowing such replication (over a TCP connection
+on a WAN say) to be initiated after the queue had already been created
+and been in use (one of the key missing features).
+
+** Increasing Concurrency and load sharing
+
+The current cluster is bottlenecked by processing everything in the
+CPG deliver thread. By removing the need for identical operation on
+each broker, we open up the possiblility of greater concurrency.
+
+Handling multicast enqueue, acquire, accpet, release etc: concurrency
+per queue. Operatons on different queues can be done in different
+threads.
+
+The new design does not force each broker to do all the work in the
+CPG thread so spreading load across cluster members should give some
+scale-up.
+
+** Misc outstanding issues & notes
+
+Replicating wiring
+- Need async completion of wiring commands?
+- qpid.sequence_counter: need extra work to support in new design, do we care?
+
+Cluster+persistence:
+- finish async completion: dequeue completion for store & cluster
+- cluster restart from store: clean stores *not* identical, pick 1, all others update.
+- need to generate cluster ids for messages recovered from store.
+
+Live updates: we don't need to stall brokers during an update!
+- update on queue-by-queue basis.
+- updatee locks queues during update, no dequeue.
+- update in reverse: don't update messages dequeued during update.
+- updatee adds update messages at front (as normal), replicated messages at back.
+- updater starts from back, sends "update done" when it hits front of queue.
+
+Flow control: need to throttle multicasting
+1. bound the number of outstanding multicasts.
+2. ensure the entire cluster keeps up, no unbounded "lag"
+The existing design uses read-credit to solve 1., and does not solve 2.
+New design should stop reading on all connections while flow control
+condition exists?
+
+Can federation also be unified, at least in configuration?
+
+Consider queues (and exchanges?) as having "reliability" attributes:
+- persistent: is the message stored on disk.
+- backed-up (to another broker): active/passive async replication.
+- replicated (to a cluster): active/active multicast replication to cluster.
+- federated: federation link to a queue/exchange on another broker.
+
+"Reliability" seems right for the first 3 but not for federation, is
+there a better term?
+
+Clustering and scalability: new design may give us the flexibility to
+address scalability as part of cluster design. Think about
+relationship to federation and "fragmented queues" idea.
+
+* Design debates/descisions
-** Flow control
-New design does not queue up CPG delivered messages, they are
-processed immediately in the CPG deliver thread. This means that CPG's
-flow control is sufficient for qpid.
-
-** Live upgrades
-
-Live upgrades refers to the ability to upgrade a cluster while it is
-running, with no downtime. Each brokers in the cluster is shut down,
-and then re-started with a new version of the broker code.
-
-To achieve this
-- Cluster protocl XML file has a new element <version number=N> attached
- to each method. This is the version at which the method was added.
-- New versions can only add methods, existing methods cannot be changed.
-- The cluster handshake for new members includes the protocol version
- at each member.
-- The cluster's version is the lowest version among its members.
-- A newer broker can join and older cluster. When it does, it must restrict
- itself to speaking the older version protocol.
-- When the cluster version increases (because the lowest version member has left)
- the remaining members may move up to the new version.
-
-
-* Design debates
** Active/active vs. active passive
An active-active cluster can be used in an active-passive mode. In
@@ -324,7 +385,7 @@ An active/passive implementation allows some simplifications over active/active:
- can do immediate local enqueue and still guarantee order.
Active/passive introduces a few extra requirements:
-- Exactly one broker has to take over if primary fails.
+- Exactly one broker hast to take over if primary fails.
- Passive members must refuse client connections.
- On failover, clients must re-try all known addresses till they find the active member.
@@ -332,17 +393,43 @@ Active/active benefits:
- A broker failure only affects the subset of clients connected to that broker.
- Clients can switch to any other broker on failover
- Backup brokers are immediately available on failover.
-- As long as a client can connect to any broker in the cluster, it can be served.
+- Some load sharing: reading from client + multicast only done on direct node.
+
+Active/active drawbacks:
+- Co-ordinating message acquisition may impact performance (not tested)
+- Code may be more complex that active/passive.
Active/passive benefits:
-- Don't need to replicate message allocation, can feed consumers at top speed.
+- Don't need message allocation strategy, can feed consumers at top speed.
+- Code may be simpler than active/active.
Active/passive drawbacks:
- All clients on one node so a failure affects every client in the system.
- After a failure there is a "reconnect storm" as every client reconnects to the new active node.
- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary.
- Clients must find the single active node, may involve multiple connect attempts.
-- No service if a partition separates a client from the active broker,
- even if the client can see other brokers.
+** Total ordering.
+
+Initial thinking: allow message ordering to differ between brokers.
+New thinking: use CPG total ordering, get identical ordering on all brokers.
+- Allowing variation in order introduces too much chance of unexpected behavior.
+- Usign total order allows other optimizations, see Message Identifiers below.
+
+** Message identifiers.
+
+Initial thinking: message ID = CPG node id + 64 bit sequence number.
+This involves a lot of mapping between cluster IDs and broker messsages.
+
+New thinking: message ID = queue name + queue position.
+- Removes most of the mapping and memory management for cluster code.
+- Requires total ordering of messages (see above)
+
+** Message rejection
+
+Initial thinking: add special reject/rejected points to cluster interface so
+rejected messages could be re-queued without multicast.
+New thinking: treat re-queueing after reject as entirely new message.
+- Simplifies cluster interface & implementation
+- Not on the critical path.
diff --git a/cpp/design_docs/new-cluster-plan.txt b/cpp/design_docs/new-cluster-plan.txt
index 32e3f710e7..781876e55a 100644
--- a/cpp/design_docs/new-cluster-plan.txt
+++ b/cpp/design_docs/new-cluster-plan.txt
@@ -17,150 +17,376 @@
# specific language governing permissions and limitations
# under the License.
-* Status of impementation
-Meaning of priorities:
-[#A] Essential for basic functioning.
-[#B] Required for first release.
-[#C] Can be addressed in a later release.
+Notes on new cluster implementation. See also: new-cluster-design.txt
-The existig prototype is bare bones to do performance benchmarks:
-- Implements publish and consumer locking protocol.
-- Defered delivery and asynchronous completion of message.
-- Optimize the case all consumers are on the same node.
-- No new member updates, no failover updates, no transactions, no persistence etc.
+* Implementation plan.
-Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
-** Similarities to existing cluster.
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- optimize the special case where all consumers are on the same node.
+- measure performance: compare active-passive and active-active modes of use.
-/Active-active/: the new cluster can be a drop-in replacement for the
-old, existing tests & customer deployment configurations are still
-valid.
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
-/Virtual synchrony/: Uses corosync to co-ordinate activity of members.
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
-/XML controls/: Uses XML to define the primitives multicast to the
-cluster.
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
-** Differences with existing cluster.
+* Task list
-/Report rather than predict consumption/: brokers explicitly tell each
-other which messages have been acquired or dequeued. This removes the
-major cause of bugs in the existing cluster.
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
-/Queue consumer locking/: to avoid duplicates only one broker can acquire or
-dequeue messages at a time - while has the consume-lock on the
-queue. If multiple brokers are consuming from the same queue the lock
-is passed around to time-share access to the queue.
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
-/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting
-the concurrency of the host) to allow concurrent processing on
-different queues. Queues are hashed onto the groups.
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
-* Completed tasks
-** DONE [#A] Minimal POC: publish/acquire/dequeue protocol.
- CLOSED: [2011-10-05 Wed 16:03]
+*** DONE broker::Cluster interface and call points.
-Defines broker::Cluster interface and call points.
-Initial interface commite
+Initial interface commited.
-Main classes
-Core: central object holding cluster classes together (replaces cluster::Cluster)
-BrokerContext: implements broker::Cluster interface.
-QueueContext: Attached to a broker::Queue, holds cluster status.
-MessageHolder:holds local messages while they are being enqueued.
+*** Main classes
-Implements multiple CPG groups for better concurrency.
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
-** DONE [#A] Large message replication.
- CLOSED: [2011-10-05 Wed 17:22]
-Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame)
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and MessageHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
-* Open questions
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both MessageHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
+- struct
+
+NOTE:
+- Messages on queues are identified by a queue name + a position.
+- Messages being routed are identified by a sequence number.
+
+Members:
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- Message localMessage[SequenceNumber] // local messages being routed.
+- thread_local SequenceNumber routingSequence
+
+NOTE: localMessage is also modified by MessageHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+ if noReplicate: return
+ # Supress everything except enqueues while we are routing.
+ # We don't want to replicate acquires & dequeues caused by an enqueu,
+ # e.g. removal of messages from ring/LV queues.
+ isRouting = true
+
+enqueue(qmsg):
+ if noReplicate: return
+ if routingSequence == 0 # thread local
+ routingSequence = nextRoutingSequence()
+ mcast create(encode(qmsg.msg),routingSeq)
+ mcast enqueue(qmsg.q,routingSeq)
+
+routed(msg):
+ if noReplicate: return
+ isRouting = false
+
+acquire(qmsg):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast acquire(qmsg)
+
+release(QueuedMessage)
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ mcast release(qmsg)
+
+accept(QueuedMessage):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ mcast accept(qmsg)
+
+reject(QueuedMessage):
+ isRejecting = true
+ mcast reject(qmsg)
+
+# FIXME no longer needed?
+drop(QueuedMessage)
+ cleanup(qmsg)
+
+*** MessageHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+ if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+ id = (sender,seq)
+ if sender == self:
+ enqueued[id,q] = (localMessage[seq], acquired=None)
+ else:
+ msg = sender.routing[seq]
+ enqueued[id,q] = (qmsg, acquired=None)
+ with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+ if sender == self: localMessage.erase(msg.id.seq)
+ else: sender.routing.erase(seq)
+
+acquire(id,q):
+ enqueued[id,q].acquired = sender
+ node[sender].acquired.push_back((id,q))
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+ enqueued[id,q].acquired = None
+ node[sender].acquired.erase((id,q))
+ if sender != self
+ with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+ sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+ sender.routing.erase[id]
+
+dequeue(id,q)
+ entry = enqueued[id,q]
+ enqueued.erase[id,q]
+ node[entry.acquired].acquired.erase(id,q)
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+ for key in node[m].acquired:
+ release(key.id, key.q)
+ node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+ consumersStopped = true;
+ while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+ consumersStopped = false;
+ listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+ if (consumersStopped) return false;
+ ++consumersBusy;
+ do_regular_dispatch_body()
+ if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+ bool owned;
+ Timer timer;
+ BrokerQueue q;
+
+ drop(): # locked
+ if owned:
+ owned = false
+ q.stopConsumers()
+ mcast release(q.name, false)
+ timer.stop()
+
+ take(): # locked
+ if not owned:
+ owned = true
+ q.startConsumers()
+ timer.start(timeout)
+
+ timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
-** TODO [#A] Queue sequence numbers vs. independant message IDs.
- SCHEDULED: <2011-10-07 Fri>
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
-Current prototype uses queue sequence numbers to identify
-message. This is tricky for updating new members as the sequence
-numbers are only known on delivery.
+Multicast messages in QueueOwner class:
-Independent message IDs that can be generated and sent with the message simplify
-this and potentially allow performance benefits by relaxing total ordering.
-However they imply additional map lookups that might hurt performance.
+consume(q):
+ if sender==self and consumers[q].empty(): ownership[q].take()
+ consumers[q].insert(sender)
-- [ ] Prototype independent message IDs, check performance.
+release(q):
+ asssert(owner[q] == sender and owner[q] in consumers[q])
+ owner[q] = circular search from sender in consumers[q]
+ if owner==self: ownership[q].take()
-* Outstanding Tasks
-** TODO [#A] Defer and async completion of wiring commands.
+cancel(q):
+ assert(queue[q].owner != sender) # sender must release() before cancel()
+ consumers[q].erase(sender)
-Testing requirement: Many tests assume wiring changes are visible
-across the cluster once the commad completes.
+member-leaves:
+ for q in queue: if owner[q] = left: left.release(q)
-Name clashes: need to avoid race if same name queue/exchange declared
-on 2 brokers simultaneously
+Need 2 more intercept points in broker::Cluster:
-** TODO [#A] Passing all existing cluster tests.
+consume(q,consumer,consumerCount) - Queue::consume()
+ if consumerCount == 1: mcast consume(q)
-The new cluster should be a drop-in replacement for the old, so it
-should be able to pass all the existing tests.
+cancel(q,consumer,consumerCount) - Queue::cancel()
+ if consumerCount == 0:
+ ownership[q].drop()
+ mcast cancel(q)
-** TODO [#A] Update to new members joining.
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Increasing concurrency
+The major performance limitation of the old cluster is that it does
+everything in the single CPG deliver thread context.
+
+We can get additional concurrency by creating a thread context _per queue_
+for queue operations: enqueue, acquire, accept etc.
+
+We associate a PollableQueue of queue operations with each AMQP queue.
+The CPG deliver thread would
+- build messages and associate with cluster IDs.
+- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
+
+Serializing operations on the same queue avoids contention, but takes advantage
+of the independence of operations on separate queues.
-Need to resolve [[Queue sequence numbers vs. independant message IDs]] first.
-- implicit sequence numbers are more tricky to replicate to new member.
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
-Update individual objects (queues and exchanges) independently.
-- create queues first, then update all queues and exchanges in parallel.
-- multiple updater threads, per queue/exchange.
-- updater sends messages to special exchange(s) (not using extended AMQP controls)
+** TODO [#B] Large message replication.
+Multicast should encode messages in fixed size buffers (64k)?
+Can't assume we can send message in one chunk.
+For 0-10 can use channel numbers & send whole frames packed into larger buffer.
+** TODO [#B] Transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Sequence number to generate per-node tx IDs.
+Replicate transaction completion.
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
-Queue updater:
-- marks the queue position at the sync point
-- sends messages starting from the sync point working towards the head of the queue.
-- send "done" message.
-Note: updater remains active throughout, consuming clients actually reduce the
-size of the update.
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
-Queue updatee:
-- enqueues received from CPG: add to back of queue as normal.
-- dequeues received from CPG: apply if found, else save to check at end of update.
-- messages from updater: add to the *front* of the queue.
-- update complete: apply any saved dequeues.
+Async callback uses *requestIOProcessing* to queue action on IO thread.
-Exchange updater:
-- updater: send snapshot of exchange as it was at the sync point.
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
-Exchange updatee:
-- queue exchange operations after the sync point.
-- when snapshot is received: apply saved operations.
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
-Updater remains active throughout.
-Updatee stalls clients until the update completes.
+** TODO [#B] New members joining - first pass
-Updating queue/exchange/binding objects is via the same encode/decode
-that is used by the store. Updatee to use recovery interfaces to
-recover?
+Re-use update code from old cluster but don't replicate sessions &
+connections.
-** TODO [#A] Failover updates to client.
-Implement the amq.failover exchange to notify clients of membership.
+Need to extend it to send cluster IDs with messages.
-** TODO [#B] Initial status protocol.
-Handshake to give status of each broker member to new members joining.
-Status includes
-- persistent store state (clean, dirty)
-- cluster protocol version.
+Need to replicate the queue ownership data as part of the update.
-** TODO [#B] Persistent cluster support.
-Initial status protoocl to support persistent start-up (see existing code)
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
Only one broker recovers from store, update to others.
Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
-** TODO [#B] Management support
-Replicate management methods that modify queues - e.g. move, purge.
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
Target broker may not have all messages on other brokers for purge/destroy.
- Queue::move() - need to wait for lock? Replicate?
- Queue::get() - ???
@@ -169,38 +395,66 @@ Target broker may not have all messages on other brokers for purge/destroy.
Need to add callpoints & mcast messages to replicate these?
-** TODO [#B] TX transaction support.
-Extend broker::Cluster interface to capture transaction context and completion.
-Running brokers exchange TX information.
-New broker update includes TX information.
-
- // FIXME aconway 2010-10-18: As things stand the cluster is not
- // compatible with transactions
- // - enqueues occur after routing is complete
- // - no call to Cluster::enqueue, should be in Queue::process?
- // - no transaction context associated with messages in the Cluster interface.
- // - no call to Cluster::accept in Queue::dequeueCommitted
-
-** TODO [#B] DTX transaction support.
-Extend broker::Cluster interface to capture transaction context and completion.
-Running brokers exchange DTX information.
-New broker update includes DTX information.
-
-** TODO [#B] Async completion of accept.
-When this is fixed in the standalone broker, it should be fixed for cluster.
-
-** TODO [#B] Network partitions and quorum.
-Re-use existing implementation.
+** TODO [#B] Flow control for internal queues.
+
+Need to bound the size of internal queues: delivery and multicast.
+- stop polling for read on client connections when we reach a bound.
+- restart polling when we get back under it.
+
+That will stop local multicasting, we still have to deal with remote
+multicasting (note existing cluster does not do this.) Something like:
+- when over bounds multicast a flow-control event.
+- on delivery of flow-control all members stop polling to read client connections
+- when back under bounds send flow-control-end, all members resume
+- if flow-controling member dies others resume
+
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#B] Better concurrency, scalabiility on multi-cores.
+Introduce PollableQueue of operations per broker queue. Queue up mcast
+operations (enqueue, acquire, accept etc.) to be handled concurrently
+on different queue. Performance testing to verify improved scalability.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
** TODO [#C] Allow non-replicated exchanges, queues.
-Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects.
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
- save replicated status to store.
- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
Replicated queue: replicate all messages.
-Replicated exchange: replicate bindings to replicated queues only.
-Configurable default? Defaults to true.
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages. Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
** TODO [#C] Refactoring of common concerns.
@@ -215,40 +469,9 @@ Look for ways to capitalize on the similarity & simplify the code.
In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.
-
+** TODO [#C] Concurrency for enqueue events.
+All enqueue events are being processed in the CPG deliver thread context which
+serializes all the work. We only need ordering on a per queue basis, can we
+enqueue in parallel on different queues and will that improve performance?
** TODO [#C] Handling immediate messages in a cluster
Include remote consumers in descision to deliver an immediate message?
-** TODO [#C] Remove old cluster hacks and workarounds
-The old cluster has workarounds in the broker code that can be removed.
-- [ ] drop code to replicate management model.
-- [ ] drop timer workarounds for TTL, management, heartbeats.
-- [ ] drop "cluster-safe assertions" in broker code.
-- [ ] drop connections, sessions, management from cluster update.
-- [ ] drop security workarounds: cluster code now operates after message decoding.
-- [ ] drop connection tracking in cluster code.
-- [ ] simper inconsistent-error handling code, no need to stall.
-** TODO [#C] Support for live upgrades.
-
-Allow brokers in a running cluster to be replaced one-by-one with a new version.
-
-The old cluster protocol was unstable because any changes in broker
-state caused changes to the cluster protocol.The new design should be
-much more stable.
-
-Points to implement:
-- Brokers should ignore unknown controls (with a warning) rather than an error.
-- Limit logging frequency for unknown control warnings.
-- Add a version number at front of every CPG message. Determines how the
- rest of the message is decoded. (allows for entirely new encodings e.g. AMQP 1.0)
-- Protocol version XML element in cluster.xml, on each control.
-- Initial status protocol to include protocol version number.
-
-** TODO [#C] Support for AMQP 1.0.
-
-* Testing
-** TODO [#A] Pass all existing cluster tests.
-Requires [[Defer and async completion of wiring commands.]]
-** TODO [#A] New cluster tests.
-Stress tests & performance benchmarks focused on changes in new cluster:
-- concurrency by queues rather than connections.
-- different handling shared queues when consuemrs are on different brokers.