summaryrefslogtreecommitdiff
path: root/cpp/design_docs/new-cluster-plan.txt
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/design_docs/new-cluster-plan.txt')
-rw-r--r--cpp/design_docs/new-cluster-plan.txt545
1 files changed, 384 insertions, 161 deletions
diff --git a/cpp/design_docs/new-cluster-plan.txt b/cpp/design_docs/new-cluster-plan.txt
index 32e3f710e7..781876e55a 100644
--- a/cpp/design_docs/new-cluster-plan.txt
+++ b/cpp/design_docs/new-cluster-plan.txt
@@ -17,150 +17,376 @@
# specific language governing permissions and limitations
# under the License.
-* Status of impementation
-Meaning of priorities:
-[#A] Essential for basic functioning.
-[#B] Required for first release.
-[#C] Can be addressed in a later release.
+Notes on new cluster implementation. See also: new-cluster-design.txt
-The existig prototype is bare bones to do performance benchmarks:
-- Implements publish and consumer locking protocol.
-- Defered delivery and asynchronous completion of message.
-- Optimize the case all consumers are on the same node.
-- No new member updates, no failover updates, no transactions, no persistence etc.
+* Implementation plan.
-Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
-** Similarities to existing cluster.
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- optimize the special case where all consumers are on the same node.
+- measure performance: compare active-passive and active-active modes of use.
-/Active-active/: the new cluster can be a drop-in replacement for the
-old, existing tests & customer deployment configurations are still
-valid.
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
-/Virtual synchrony/: Uses corosync to co-ordinate activity of members.
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
-/XML controls/: Uses XML to define the primitives multicast to the
-cluster.
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
-** Differences with existing cluster.
+* Task list
-/Report rather than predict consumption/: brokers explicitly tell each
-other which messages have been acquired or dequeued. This removes the
-major cause of bugs in the existing cluster.
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
-/Queue consumer locking/: to avoid duplicates only one broker can acquire or
-dequeue messages at a time - while has the consume-lock on the
-queue. If multiple brokers are consuming from the same queue the lock
-is passed around to time-share access to the queue.
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
-/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting
-the concurrency of the host) to allow concurrent processing on
-different queues. Queues are hashed onto the groups.
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
-* Completed tasks
-** DONE [#A] Minimal POC: publish/acquire/dequeue protocol.
- CLOSED: [2011-10-05 Wed 16:03]
+*** DONE broker::Cluster interface and call points.
-Defines broker::Cluster interface and call points.
-Initial interface commite
+Initial interface commited.
-Main classes
-Core: central object holding cluster classes together (replaces cluster::Cluster)
-BrokerContext: implements broker::Cluster interface.
-QueueContext: Attached to a broker::Queue, holds cluster status.
-MessageHolder:holds local messages while they are being enqueued.
+*** Main classes
-Implements multiple CPG groups for better concurrency.
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
-** DONE [#A] Large message replication.
- CLOSED: [2011-10-05 Wed 17:22]
-Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame)
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and MessageHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
-* Open questions
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both MessageHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
+- struct
+
+NOTE:
+- Messages on queues are identified by a queue name + a position.
+- Messages being routed are identified by a sequence number.
+
+Members:
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- Message localMessage[SequenceNumber] // local messages being routed.
+- thread_local SequenceNumber routingSequence
+
+NOTE: localMessage is also modified by MessageHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+ if noReplicate: return
+ # Supress everything except enqueues while we are routing.
+ # We don't want to replicate acquires & dequeues caused by an enqueu,
+ # e.g. removal of messages from ring/LV queues.
+ isRouting = true
+
+enqueue(qmsg):
+ if noReplicate: return
+ if routingSequence == 0 # thread local
+ routingSequence = nextRoutingSequence()
+ mcast create(encode(qmsg.msg),routingSeq)
+ mcast enqueue(qmsg.q,routingSeq)
+
+routed(msg):
+ if noReplicate: return
+ isRouting = false
+
+acquire(qmsg):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast acquire(qmsg)
+
+release(QueuedMessage)
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ mcast release(qmsg)
+
+accept(QueuedMessage):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ mcast accept(qmsg)
+
+reject(QueuedMessage):
+ isRejecting = true
+ mcast reject(qmsg)
+
+# FIXME no longer needed?
+drop(QueuedMessage)
+ cleanup(qmsg)
+
+*** MessageHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+ if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+ id = (sender,seq)
+ if sender == self:
+ enqueued[id,q] = (localMessage[seq], acquired=None)
+ else:
+ msg = sender.routing[seq]
+ enqueued[id,q] = (qmsg, acquired=None)
+ with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+ if sender == self: localMessage.erase(msg.id.seq)
+ else: sender.routing.erase(seq)
+
+acquire(id,q):
+ enqueued[id,q].acquired = sender
+ node[sender].acquired.push_back((id,q))
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+ enqueued[id,q].acquired = None
+ node[sender].acquired.erase((id,q))
+ if sender != self
+ with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+ sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+ sender.routing.erase[id]
+
+dequeue(id,q)
+ entry = enqueued[id,q]
+ enqueued.erase[id,q]
+ node[entry.acquired].acquired.erase(id,q)
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+ for key in node[m].acquired:
+ release(key.id, key.q)
+ node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+ consumersStopped = true;
+ while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+ consumersStopped = false;
+ listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+ if (consumersStopped) return false;
+ ++consumersBusy;
+ do_regular_dispatch_body()
+ if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+ bool owned;
+ Timer timer;
+ BrokerQueue q;
+
+ drop(): # locked
+ if owned:
+ owned = false
+ q.stopConsumers()
+ mcast release(q.name, false)
+ timer.stop()
+
+ take(): # locked
+ if not owned:
+ owned = true
+ q.startConsumers()
+ timer.start(timeout)
+
+ timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
-** TODO [#A] Queue sequence numbers vs. independant message IDs.
- SCHEDULED: <2011-10-07 Fri>
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
-Current prototype uses queue sequence numbers to identify
-message. This is tricky for updating new members as the sequence
-numbers are only known on delivery.
+Multicast messages in QueueOwner class:
-Independent message IDs that can be generated and sent with the message simplify
-this and potentially allow performance benefits by relaxing total ordering.
-However they imply additional map lookups that might hurt performance.
+consume(q):
+ if sender==self and consumers[q].empty(): ownership[q].take()
+ consumers[q].insert(sender)
-- [ ] Prototype independent message IDs, check performance.
+release(q):
+ asssert(owner[q] == sender and owner[q] in consumers[q])
+ owner[q] = circular search from sender in consumers[q]
+ if owner==self: ownership[q].take()
-* Outstanding Tasks
-** TODO [#A] Defer and async completion of wiring commands.
+cancel(q):
+ assert(queue[q].owner != sender) # sender must release() before cancel()
+ consumers[q].erase(sender)
-Testing requirement: Many tests assume wiring changes are visible
-across the cluster once the commad completes.
+member-leaves:
+ for q in queue: if owner[q] = left: left.release(q)
-Name clashes: need to avoid race if same name queue/exchange declared
-on 2 brokers simultaneously
+Need 2 more intercept points in broker::Cluster:
-** TODO [#A] Passing all existing cluster tests.
+consume(q,consumer,consumerCount) - Queue::consume()
+ if consumerCount == 1: mcast consume(q)
-The new cluster should be a drop-in replacement for the old, so it
-should be able to pass all the existing tests.
+cancel(q,consumer,consumerCount) - Queue::cancel()
+ if consumerCount == 0:
+ ownership[q].drop()
+ mcast cancel(q)
-** TODO [#A] Update to new members joining.
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Increasing concurrency
+The major performance limitation of the old cluster is that it does
+everything in the single CPG deliver thread context.
+
+We can get additional concurrency by creating a thread context _per queue_
+for queue operations: enqueue, acquire, accept etc.
+
+We associate a PollableQueue of queue operations with each AMQP queue.
+The CPG deliver thread would
+- build messages and associate with cluster IDs.
+- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
+
+Serializing operations on the same queue avoids contention, but takes advantage
+of the independence of operations on separate queues.
-Need to resolve [[Queue sequence numbers vs. independant message IDs]] first.
-- implicit sequence numbers are more tricky to replicate to new member.
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
-Update individual objects (queues and exchanges) independently.
-- create queues first, then update all queues and exchanges in parallel.
-- multiple updater threads, per queue/exchange.
-- updater sends messages to special exchange(s) (not using extended AMQP controls)
+** TODO [#B] Large message replication.
+Multicast should encode messages in fixed size buffers (64k)?
+Can't assume we can send message in one chunk.
+For 0-10 can use channel numbers & send whole frames packed into larger buffer.
+** TODO [#B] Transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Sequence number to generate per-node tx IDs.
+Replicate transaction completion.
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
-Queue updater:
-- marks the queue position at the sync point
-- sends messages starting from the sync point working towards the head of the queue.
-- send "done" message.
-Note: updater remains active throughout, consuming clients actually reduce the
-size of the update.
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
-Queue updatee:
-- enqueues received from CPG: add to back of queue as normal.
-- dequeues received from CPG: apply if found, else save to check at end of update.
-- messages from updater: add to the *front* of the queue.
-- update complete: apply any saved dequeues.
+Async callback uses *requestIOProcessing* to queue action on IO thread.
-Exchange updater:
-- updater: send snapshot of exchange as it was at the sync point.
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
-Exchange updatee:
-- queue exchange operations after the sync point.
-- when snapshot is received: apply saved operations.
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
-Updater remains active throughout.
-Updatee stalls clients until the update completes.
+** TODO [#B] New members joining - first pass
-Updating queue/exchange/binding objects is via the same encode/decode
-that is used by the store. Updatee to use recovery interfaces to
-recover?
+Re-use update code from old cluster but don't replicate sessions &
+connections.
-** TODO [#A] Failover updates to client.
-Implement the amq.failover exchange to notify clients of membership.
+Need to extend it to send cluster IDs with messages.
-** TODO [#B] Initial status protocol.
-Handshake to give status of each broker member to new members joining.
-Status includes
-- persistent store state (clean, dirty)
-- cluster protocol version.
+Need to replicate the queue ownership data as part of the update.
-** TODO [#B] Persistent cluster support.
-Initial status protoocl to support persistent start-up (see existing code)
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
Only one broker recovers from store, update to others.
Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
-** TODO [#B] Management support
-Replicate management methods that modify queues - e.g. move, purge.
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
Target broker may not have all messages on other brokers for purge/destroy.
- Queue::move() - need to wait for lock? Replicate?
- Queue::get() - ???
@@ -169,38 +395,66 @@ Target broker may not have all messages on other brokers for purge/destroy.
Need to add callpoints & mcast messages to replicate these?
-** TODO [#B] TX transaction support.
-Extend broker::Cluster interface to capture transaction context and completion.
-Running brokers exchange TX information.
-New broker update includes TX information.
-
- // FIXME aconway 2010-10-18: As things stand the cluster is not
- // compatible with transactions
- // - enqueues occur after routing is complete
- // - no call to Cluster::enqueue, should be in Queue::process?
- // - no transaction context associated with messages in the Cluster interface.
- // - no call to Cluster::accept in Queue::dequeueCommitted
-
-** TODO [#B] DTX transaction support.
-Extend broker::Cluster interface to capture transaction context and completion.
-Running brokers exchange DTX information.
-New broker update includes DTX information.
-
-** TODO [#B] Async completion of accept.
-When this is fixed in the standalone broker, it should be fixed for cluster.
-
-** TODO [#B] Network partitions and quorum.
-Re-use existing implementation.
+** TODO [#B] Flow control for internal queues.
+
+Need to bound the size of internal queues: delivery and multicast.
+- stop polling for read on client connections when we reach a bound.
+- restart polling when we get back under it.
+
+That will stop local multicasting, we still have to deal with remote
+multicasting (note existing cluster does not do this.) Something like:
+- when over bounds multicast a flow-control event.
+- on delivery of flow-control all members stop polling to read client connections
+- when back under bounds send flow-control-end, all members resume
+- if flow-controling member dies others resume
+
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#B] Better concurrency, scalabiility on multi-cores.
+Introduce PollableQueue of operations per broker queue. Queue up mcast
+operations (enqueue, acquire, accept etc.) to be handled concurrently
+on different queue. Performance testing to verify improved scalability.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
** TODO [#C] Allow non-replicated exchanges, queues.
-Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects.
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
- save replicated status to store.
- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
Replicated queue: replicate all messages.
-Replicated exchange: replicate bindings to replicated queues only.
-Configurable default? Defaults to true.
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages. Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
** TODO [#C] Refactoring of common concerns.
@@ -215,40 +469,9 @@ Look for ways to capitalize on the similarity & simplify the code.
In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.
-
+** TODO [#C] Concurrency for enqueue events.
+All enqueue events are being processed in the CPG deliver thread context which
+serializes all the work. We only need ordering on a per queue basis, can we
+enqueue in parallel on different queues and will that improve performance?
** TODO [#C] Handling immediate messages in a cluster
Include remote consumers in descision to deliver an immediate message?
-** TODO [#C] Remove old cluster hacks and workarounds
-The old cluster has workarounds in the broker code that can be removed.
-- [ ] drop code to replicate management model.
-- [ ] drop timer workarounds for TTL, management, heartbeats.
-- [ ] drop "cluster-safe assertions" in broker code.
-- [ ] drop connections, sessions, management from cluster update.
-- [ ] drop security workarounds: cluster code now operates after message decoding.
-- [ ] drop connection tracking in cluster code.
-- [ ] simper inconsistent-error handling code, no need to stall.
-** TODO [#C] Support for live upgrades.
-
-Allow brokers in a running cluster to be replaced one-by-one with a new version.
-
-The old cluster protocol was unstable because any changes in broker
-state caused changes to the cluster protocol.The new design should be
-much more stable.
-
-Points to implement:
-- Brokers should ignore unknown controls (with a warning) rather than an error.
-- Limit logging frequency for unknown control warnings.
-- Add a version number at front of every CPG message. Determines how the
- rest of the message is decoded. (allows for entirely new encodings e.g. AMQP 1.0)
-- Protocol version XML element in cluster.xml, on each control.
-- Initial status protocol to include protocol version number.
-
-** TODO [#C] Support for AMQP 1.0.
-
-* Testing
-** TODO [#A] Pass all existing cluster tests.
-Requires [[Defer and async completion of wiring commands.]]
-** TODO [#A] New cluster tests.
-Stress tests & performance benchmarks focused on changes in new cluster:
-- concurrency by queues rather than connections.
-- different handling shared queues when consuemrs are on different brokers.