From 0d5cd531e2ca52e914e62e196c48bcbae8a96649 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 1 Nov 2010 14:45:27 +0000 Subject: Rename cpp/design to cpp/design_docs to avoid conflict with DESIGN on windows. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1029686 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/design/new-cluster-design.txt | 435 ------------------------------ cpp/design/new-cluster-plan.txt | 473 --------------------------------- cpp/design_docs/new-cluster-design.txt | 435 ++++++++++++++++++++++++++++++ cpp/design_docs/new-cluster-plan.txt | 473 +++++++++++++++++++++++++++++++++ 4 files changed, 908 insertions(+), 908 deletions(-) delete mode 100644 cpp/design/new-cluster-design.txt delete mode 100644 cpp/design/new-cluster-plan.txt create mode 100644 cpp/design_docs/new-cluster-design.txt create mode 100644 cpp/design_docs/new-cluster-plan.txt (limited to 'cpp') diff --git a/cpp/design/new-cluster-design.txt b/cpp/design/new-cluster-design.txt deleted file mode 100644 index 7adb46fee3..0000000000 --- a/cpp/design/new-cluster-design.txt +++ /dev/null @@ -1,435 +0,0 @@ --*-org-*- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -* A new design for Qpid clustering. - -** Issues with current design. - -The cluster is based on virtual synchrony: each broker multicasts -events and the events from all brokers are serialized and delivered in -the same order to each broker. - -In the current design raw byte buffers from client connections are -multicast, serialized and delivered in the same order to each broker. - -Each broker has a replica of all queues, exchanges, bindings and also -all connections & sessions from every broker. Cluster code treats the -broker as a "black box", it "plays" the client data into the -connection objects and assumes that by giving the same input, each -broker will reach the same state. - -A new broker joining the cluster receives a snapshot of the current -cluster state, and then follows the multicast conversation. - -*** Maintenance issues. - -The entire state of each broker is replicated to every member: -connections, sessions, queues, messages, exchanges, management objects -etc. Any discrepancy in the state that affects how messages are -allocated to consumers can cause an inconsistency. - -- Entire broker state must be faithfully updated to new members. -- Management model also has to be replicated. -- All queues are replicated, can't have unreplicated queues (e.g. for management) - -Events that are not deterministically predictable from the client -input data stream can cause inconsistencies. In particular use of -timers/timestamps require cluster workarounds to synchronize. - -A member that encounters an error which is not encounted by all other -members is considered inconsistent and will shut itself down. Such -errors can come from any area of the broker code, e.g. different -ACL files can cause inconsistent errors. - -The following areas required workarounds to work in a cluster: - -- Timers/timestamps in broker code: management, heartbeats, TTL -- Security: cluster must replicate *after* decryption by security layer. -- Management: not initially included in the replicated model, source of many inconsistencies. - -It is very easy for someone adding a feature or fixing a bug in the -standalone broker to break the cluster by: -- adding new state that needs to be replicated in cluster updates. -- doing something in a timer or other non-connection thread. - -It's very hard to test for such breaks. We need a looser coupling -and a more explicitly defined interface between cluster and standalone -broker code. - -*** Performance issues. - -Virtual synchrony delivers all data from all clients in a single -stream to each broker. The cluster must play this data thru the full -broker code stack: connections, sessions etc. in a single thread -context in order to get identical behavior on each broker. The cluster -has a pipelined design to get some concurrency but this is a severe -limitation on scalability in multi-core hosts compared to the -standalone broker which processes each connection in a separate thread -context. - -** A new cluster design. - -Clearly defined interface between broker code and cluster plug-in. - -Replicate queue events rather than client data. -- Broker behavior only needs to match per-queue. -- Smaller amount of code (queue implementation) that must behave predictably. -- Events only need be serialized per-queue, allows concurrency between queues - -Use a moving queue ownership protocol to agree order of dequeues. -No longer relies on identical state and lock-step behavior to cause -identical dequeues on each broker. - -Each queue has an associated thread-context. Events for a queue are executed -in that queues context, in parallel with events for other queues. - -*** Requirements - -The cluster must provide these delivery guarantees: - -- client sends transfer: message must be replicated and not lost even if the local broker crashes. -- client acquires a message: message must not be delivered on another broker while acquired. -- client accepts message: message is forgotten, will never be delivered or re-queued by any broker. -- client releases message: message must be re-queued on cluster and not lost. -- client rejects message: message must be dead-lettered or discarded and forgotten. -- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. - -Each guarantee takes effect when the client receives a *completion* -for the associated command (transfer, acquire, reject, accept) - -*** Broker receiving messages - -On recieving a message transfer, in the connection thread we: -- multicast a message-received event. -- enqueue and complete the transfer when it is self-delivered. - -Other brokers enqueue the message when they recieve the message-received event. - -Enqueues are queued up with other queue operations to be executed in the -thread context associated with the queue. - -*** Broker sending messages: moving queue ownership - -Each queue is *owned* by at most one cluster broker at a time. Only -that broker may acquire or dequeue messages. The owner multicasts -notification of messages it acquires/dequeues to the cluster. -Periodically the owner hands over ownership to another interested -broker, providing time-shared access to the queue among all interested -brokers. - -We assume the same IO-driven dequeuing algorithm as the standalone -broker with one modification: queues can be "locked". A locked queue -is not available for dequeuing messages and will be skipped by the -output algorithm. - -At any given time only those queues owned by the local broker will be -unlocked. - -As messages are acquired/dequeued from unlocked queues by the IO threads -the broker multicasts acquire/dequeue events to the cluster. - -When an unlocked queue has no more consumers with credit, or when a -time limit expires, the broker relinquishes ownership by multicasting -a release-queue event, allowing another interested broker to take -ownership. - -*** Asynchronous completion of accept -### HERE -In acknowledged mode a message is not forgotten until it is accepted, -to allow for requeue on rejection or crash. The accept should not be -completed till the message has been forgotten. - -On receiving an accept the broker: -- dequeues the message from the local queue -- multicasts an "accept" event -- completes the accept asynchronously when the dequeue event is self delivered. - -NOTE: The message store does not currently implement asynchronous -completions of accept, this is a bug. - -** Inconsistent errors. - -The new design eliminates most sources of inconsistent errors -(connections, sessions, security, management etc.) The only points -where inconsistent errors can occur are at enqueue and dequeue (most -likely store-related errors.) - -The new design can use the exisiting error-handling protocol with one -major improvement: since brokers are no longer required to maintain -identical state they do not have to stall processing while an error is -being resolved. - -#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. - -** Updating new members - -When a new member (the updatee) joins a cluster it needs to be brought -up to date with the rest of the cluster. An existing member (the -updater) sends an "update". - -In the old cluster design the update is a snapshot of the entire -broker state. To ensure consistency of the snapshot both the updatee -and the updater "stall" at the start of the update, i.e. they stop -processing multicast events and queue them up for processing when the -update is complete. This creates a back-log of work to get through, -which leaves them lagging behind the rest of the cluster till they -catch up (which is not guaranteed to happen in a bounded time.) - -With the new cluster design only exchanges, queues, bindings and -messages need to be replicated. - -Update of wiring (exchanges, queues, bindings) is the same as current -design. - -Update of messages is different: -- per-queue rather than per-broker, separate queues can be updated in parallel. -- updates queues in reverse order to eliminate unbounded catch-up -- does not require updater & updatee to stall during update. - -Replication events, multicast to cluster: -- enqueue(q,m): message m pushed on back of queue q . -- acquire(q,m): mark m acquired -- dequeue(q,m): forget m. -Messages sent on update connection: -- update_front(q,m): during update, receiver pushes m to *front* of q -- update_done(q): during update, update of q is complete. - -Updater: -- when updatee joins set iterator i = q.end() -- while i != q.begin(): --i; send update_front(q,*i) to updatee -- send update_done(q) to updatee - -Updatee: -- q initially in locked state, can't dequeue locally. -- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) -- receive update_front(q,m): q.push_front(m) -- receive update_done(q): q can be unlocked for local dequeing. - -Benefits: -- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. -- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. -- During update consumers actually help by removing messages before they need to be updated. -- Needs no separate "work to do" queue, only the broker queues themselves. - -# TODO how can we recover from updater crashing before update complete? -# Clear queues that are not updated & send request for udpates on those queues? - -# TODO updatee may receive a dequeue for a message it has not yet seen, needs -# to hold on to that so it can drop the message when it is seen. -# Similar problem exists for wiring? - -** Cluster API - -The new cluster API is similar to the MessageStore interface. -(Initially I thought it would be an extension of the MessageStore interface, -but as the design develops it seems better to make it a separate interface.) - -The cluster interface captures these events: -- wiring changes: queue/exchange declare/bind -- message enqueued/acquired/released/rejected/dequeued. - -The cluster will require some extensions to the Queue: -- Queues can be "locked", locked queues are ignored by IO-driven output. -- Cluster must be able to apply queue events from the cluster to a queue. - These appear to fit into existing queue operations. - -** Maintainability - -This design gives us more robust code with a clear and explicit interfaces. - -The cluster depends on specific events clearly defined by an explicit -interface. Provided the semantics of this interface are not violated, -the cluster will not be broken by changes to broker code. - -The cluster no longer requires identical processing of the entire -broker stack on each broker. It is not affected by the details of how -the broker allocates messages. It is independent of the -protocol-specific state of connections and sessions and so is -protected from future protocol changes (e.g. AMQP 1.0) - -A number of specific ways the code will be simplified: -- drop code to replicate management model. -- drop timer workarounds for TTL, management, heartbeats. -- drop "cluster-safe assertions" in broker code. -- drop connections, sessions, management from cluster update. -- drop security workarounds: cluster code now operates after message decoding. -- drop connection tracking in cluster code. -- simper inconsistent-error handling code, no need to stall. - -** Performance - -The only way to verify the relative performance of the new design is -to prototype & profile. The following points suggest the new design -may scale/perform better: - -Some work moved from virtual synchrony thread to connection threads: -- All connection/session logic moves to connection thread. -- Exchange routing logic moves to connection thread. -- On local broker dequeueing is done in connection thread -- Local broker dequeue is IO driven as for a standalone broker. - -For queues with all consumers on a single node dequeue is all -IO-driven in connection thread. Pay for time-sharing only if queue has -consumers on multiple brokers. - -Doing work for different queues in parallel scales on multi-core boxes when -there are multiple queues. - -One difference works against performance, thre is an extra -encode/decode. The old design multicasts raw client data and decodes -it in the virtual synchrony thread. The new design would decode -messages in the connection thread, re-encode them for multicast, and -decode (on non-local brokers) in the virtual synchrony thread. There -is extra work here, but only in the *connection* thread: on a -multi-core machine this happens in parallel for every connection, so -it probably is not a bottleneck. There may be scope to optimize -decode/re-encode by re-using some of the original encoded data, this -could also benefit the stand-alone broker. - -** Asynchronous queue replication - -The existing "asynchronous queue replication" feature maintains a -passive backup passive backup of queues on a remote broker over a TCP -connection. - -The new cluster replication protocol could be re-used to implement -asynchronous queue replication: its just a special case where the -active broker is always the queue owner and the enqueue/dequeue -messages are sent over a TCP connection rather than multicast. - -The new update update mechanism could also work with 'asynchronous -queue replication', allowing such replication (over a TCP connection -on a WAN say) to be initiated after the queue had already been created -and been in use (one of the key missing features). - -** Increasing Concurrency and load sharing - -The current cluster is bottlenecked by processing everything in the -CPG deliver thread. By removing the need for identical operation on -each broker, we open up the possiblility of greater concurrency. - -Handling multicast enqueue, acquire, accpet, release etc: concurrency -per queue. Operatons on different queues can be done in different -threads. - -The new design does not force each broker to do all the work in the -CPG thread so spreading load across cluster members should give some -scale-up. - -** Misc outstanding issues & notes - -Replicating wiring -- Need async completion of wiring commands? -- qpid.sequence_counter: need extra work to support in new design, do we care? - -Cluster+persistence: -- finish async completion: dequeue completion for store & cluster -- cluster restart from store: clean stores *not* identical, pick 1, all others update. -- need to generate cluster ids for messages recovered from store. - -Live updates: we don't need to stall brokers during an update! -- update on queue-by-queue basis. -- updatee locks queues during update, no dequeue. -- update in reverse: don't update messages dequeued during update. -- updatee adds update messages at front (as normal), replicated messages at back. -- updater starts from back, sends "update done" when it hits front of queue. - -Flow control: need to throttle multicasting -1. bound the number of outstanding multicasts. -2. ensure the entire cluster keeps up, no unbounded "lag" -The existing design uses read-credit to solve 1., and does not solve 2. -New design should stop reading on all connections while flow control -condition exists? - -Can federation also be unified, at least in configuration? - -Consider queues (and exchanges?) as having "reliability" attributes: -- persistent: is the message stored on disk. -- backed-up (to another broker): active/passive async replication. -- replicated (to a cluster): active/active multicast replication to cluster. -- federated: federation link to a queue/exchange on another broker. - -"Reliability" seems right for the first 3 but not for federation, is -there a better term? - -Clustering and scalability: new design may give us the flexibility to -address scalability as part of cluster design. Think about -relationship to federation and "fragmented queues" idea. - -* Design debates/descisions - -** Active/active vs. active passive - -An active-active cluster can be used in an active-passive mode. In -this mode we would like the cluster to be as efficient as a strictly -active-passive implementation. - -An active/passive implementation allows some simplifications over active/active: -- drop Queue ownership and locking -- don't need to replicate message acquisition. -- can do immediate local enqueue and still guarantee order. - -Active/passive introduces a few extra requirements: -- Exactly one broker hast to take over if primary fails. -- Passive members must refuse client connections. -- On failover, clients must re-try all known addresses till they find the active member. - -Active/active benefits: -- A broker failure only affects the subset of clients connected to that broker. -- Clients can switch to any other broker on failover -- Backup brokers are immediately available on failover. -- Some load sharing: reading from client + multicast only done on direct node. - -Active/active drawbacks: -- Co-ordinating message acquisition may impact performance (not tested) -- Code may be more complex that active/passive. - -Active/passive benefits: -- Don't need message allocation strategy, can feed consumers at top speed. -- Code may be simpler than active/active. - -Active/passive drawbacks: -- All clients on one node so a failure affects every client in the system. -- After a failure there is a "reconnect storm" as every client reconnects to the new active node. -- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. -- Clients must find the single active node, may involve multiple connect attempts. - -** Total ordering. - -Initial thinking: allow message ordering to differ between brokers. -New thinking: use CPG total ordering, get identical ordering on all brokers. -- Allowing variation in order introduces too much chance of unexpected behavior. -- Usign total order allows other optimizations, see Message Identifiers below. - -** Message identifiers. - -Initial thinking: message ID = CPG node id + 64 bit sequence number. -This involves a lot of mapping between cluster IDs and broker messsages. - -New thinking: message ID = queue name + queue position. -- Removes most of the mapping and memory management for cluster code. -- Requires total ordering of messages (see above) - -** Message rejection - -Initial thinking: add special reject/rejected points to cluster interface so -rejected messages could be re-queued without multicast. - -New thinking: treat re-queueing after reject as entirely new message. -- Simplifies cluster interface & implementation -- Not on the critical path. diff --git a/cpp/design/new-cluster-plan.txt b/cpp/design/new-cluster-plan.txt deleted file mode 100644 index b5cd353fc0..0000000000 --- a/cpp/design/new-cluster-plan.txt +++ /dev/null @@ -1,473 +0,0 @@ --*-org-*- - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -Notes on new cluster implementation. See also: new-cluster-design.txt - -* Implementation plan. - -Co-existence with old cluster code and tests: -- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. -- Double up tests with old version/new version as the new code develops. - -Minimal POC for message delivery & perf test. -- no wiring replication, no updates, no failover, no persistence, no async completion. -- just implement publish and acquire/dequeue locking protocol. -- optimize the special case where all consumers are on the same node. -- measure performance: compare active-passive and active-active modes of use. - -Full implementation of transient cluster -- Update (based on existing update), async completion etc. -- Passing all existing transient cluster tests. - -Persistent cluster -- Make sure async completion works correctly. -- InitialStatus protoocl etc. to support persistent start-up (existing code) -- cluster restart from store: stores not identical. Load one, update the rest. - - assign cluster ID's to messages recovered from store, don't replicate. - -Improved update protocol -- per-queue, less stalling, bounded catch-up. - -* Task list - -** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. - -NOTE: as implementation questions arise, take the easiest option and make -a note for later optimization/improvement. - -*** Tests -- python test: 4 senders, numbered messages, 4 receivers, verify message set. -- acquire then release messages: verify can be dequeued on any member -- acquire then kill broker: verify can be dequeued other members. -- acquire then reject: verify goes on alt-exchange once only. - -*** DONE broker::Cluster interface and call points. - -Initial interface commited. - -*** Main classes - -BrokerHandler: -- implements broker::Cluster intercept points. -- sends mcast events to inform cluster of local actions. -- thread safe, called in connection threads. - -LocalMessageMap: -- Holds local messages while they are being enqueued. -- thread safe: called by both BrokerHandler and MessageHandler - -MessageHandler: -- handles delivered mcast messages related to messages. -- initiates local actions in response to mcast events. -- thread unsafe, only called in deliver thread. -- maintains view of cluster state regarding messages. - -QueueOwnerHandler: -- handles delivered mcast messages related to queue consumer ownership. -- thread safe, called in deliver, connection and timer threads. -- maintains view of cluster state regarding queue ownership. - -cluster::Core: class to hold new cluster together (replaces cluster::Cluster) -- thread safe: manage state used by both MessageHandler and BrokerHandler - -The following code sketch illustrates only the "happy path" error handling -is omitted. - -*** BrokerHandler -Types: -- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } -- struct - -NOTE: -- Messages on queues are identified by a queue name + a position. -- Messages being routed are identified by a sequence number. - -Members: -- thread_local bool noReplicate // suppress replication. -- thread_local bool isRouting // suppress operations while routing -- Message localMessage[SequenceNumber] // local messages being routed. -- thread_local SequenceNumber routingSequence - -NOTE: localMessage is also modified by MessageHandler. - -broker::Cluster intercept functions: - -routing(msg) - if noReplicate: return - # Supress everything except enqueues while we are routing. - # We don't want to replicate acquires & dequeues caused by an enqueu, - # e.g. removal of messages from ring/LV queues. - isRouting = true - -enqueue(qmsg): - if noReplicate: return - if routingSequence == 0 # thread local - routingSequence = nextRoutingSequence() - mcast create(encode(qmsg.msg),routingSeq) - mcast enqueue(qmsg.q,routingSeq) - -routed(msg): - if noReplicate: return - isRouting = false - -acquire(qmsg): - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - if msg.id: mcast acquire(qmsg) - -release(QueuedMessage) - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - mcast release(qmsg) - -accept(QueuedMessage): - if noReplicate: return - if isRouting: return # Ignore while we are routing a message. - mcast accept(qmsg) - -reject(QueuedMessage): - isRejecting = true - mcast reject(qmsg) - -# FIXME no longer needed? -drop(QueuedMessage) - cleanup(qmsg) - -*** MessageHandler and mcast messages -Types: -- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } -- struct QueueKey { MessageId id; QueueName q; } -- typedef map Queue -- struct Node { Message routing[SequenceNumber]; list acquired; } - -Members: -- QueueEntry enqueued[QueueKey] -- Node node[NodeId] - -Mcast messages in Message class: - -create(msg,seq) - if sender != self: node[sender].routing[seq] = decode(msg) - -enqueue(q,seq): - id = (sender,seq) - if sender == self: - enqueued[id,q] = (localMessage[seq], acquired=None) - else: - msg = sender.routing[seq] - enqueued[id,q] = (qmsg, acquired=None) - with noReplicate=true: qmsg = broker.getQueue(q).push(msg) - -routed(seq): - if sender == self: localMessage.erase(msg.id.seq) - else: sender.routing.erase(seq) - -acquire(id,q): - enqueued[id,q].acquired = sender - node[sender].acquired.push_back((id,q)) - if sender != self: - with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) - -release(id,q) - enqueued[id,q].acquired = None - node[sender].acquired.erase((id,q)) - if sender != self - with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) - -reject(id,q): - sender.routing[id] = enqueued[id,q] # prepare for re-queueing - -rejected(id,q) - sender.routing.erase[id] - -dequeue(id,q) - entry = enqueued[id,q] - enqueued.erase[id,q] - node[entry.acquired].acquired.erase(id,q) - if sender != self: - with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) - -member m leaves cluster: - for key in node[m].acquired: - release(key.id, key.q) - node.erase(m) - -*** Queue consumer locking - -When a queue is locked it does not deliver messages to its consumers. - -New broker::Queue functions: -- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. -- startConsumers(): reset consumersStopped flag - -Implementation sketch, locking omitted: - -void Queue::stopConsumers() { - consumersStopped = true; - while (consumersBusy) consumersBusyMonitor.wait(); -} - -void Queue::startConsumers() { - consumersStopped = false; - listeners.notify(); -} - -bool Queue::dispatch(consumer) { - if (consumersStopped) return false; - ++consumersBusy; - do_regular_dispatch_body() - if (--consumersBusy == 0) consumersBusyMonitor.notify(); -} - -*** QueueOwnerHandler - -Invariants: -- Each queue is owned by at most one node at any time. -- Each node is interested in a set of queues at any given time. -- A queue is un-owned if no node is interested. - -The queue owner releases the queue when -- it loses interest i.e. queue has no consumers with credit. -- a configured time delay expires and there are other interested nodes. - -The owner mcasts release(q). On delivery the new queue owner is the -next node in node-id order (treating nodes as a circular list) -starting from the old owner that is interested in the queue. - -Queue consumers initially are stopped, only started when we get -ownership from the cluster. - -Thread safety: called by deliver, connection and timer threads, needs locking. - -Thread safe object per queue holding queue ownership status. -Called by deliver, connection and timer threads. - -class QueueOwnership { - bool owned; - Timer timer; - BrokerQueue q; - - drop(): # locked - if owned: - owned = false - q.stopConsumers() - mcast release(q.name, false) - timer.stop() - - take(): # locked - if not owned: - owned = true - q.startConsumers() - timer.start(timeout) - - timer.fire(): drop() -} - -Data Members, only modified/examined in deliver thread: -- typedef set ConsumerSet -- map consumers -- map owner - -Thread safe data members, accessed in connection threads (via BrokerHandler): -- map ownership - -Multicast messages in QueueOwner class: - -consume(q): - if sender==self and consumers[q].empty(): ownership[q].take() - consumers[q].insert(sender) - -release(q): - asssert(owner[q] == sender and owner[q] in consumers[q]) - owner[q] = circular search from sender in consumers[q] - if owner==self: ownership[q].take() - -cancel(q): - assert(queue[q].owner != sender) # sender must release() before cancel() - consumers[q].erase(sender) - -member-leaves: - for q in queue: if owner[q] = left: left.release(q) - -Need 2 more intercept points in broker::Cluster: - -consume(q,consumer,consumerCount) - Queue::consume() - if consumerCount == 1: mcast consume(q) - -cancel(q,consumer,consumerCount) - Queue::cancel() - if consumerCount == 0: - ownership[q].drop() - mcast cancel(q) - -#TODO: lifecycle, updating cluster data structures when queues are destroyed - -*** Increasing concurrency -The major performance limitation of the old cluster is that it does -everything in the single CPG deliver thread context. - -We can get additional concurrency by creating a thread context _per queue_ -for queue operations: enqueue, acquire, accept etc. - -We associate a PollableQueue of queue operations with each AMQP queue. -The CPG deliver thread would -- build messages and associate with cluster IDs. -- push queue ops to the appropriate PollableQueue to be dispatched the queues thread. - -Serializing operations on the same queue avoids contention, but takes advantage -of the independence of operations on separate queues. - -*** Re-use of existing cluster code -- re-use Event -- re-use Multicaster -- re-use same PollableQueueSetup (may experiment later) -- new Core class to replace Cluster. -- keep design modular, keep threading rules clear. - -** TODO [#B] Large message replication. -Multicast should encode messages in fixed size buffers (64k)? -Can't assume we can send message in one chunk. -For 0-10 can use channel numbers & send whole frames packed into larger buffer. -** TODO [#B] Batch CPG multicast messages -The new cluster design involves a lot of small multicast messages, -they need to be batched into larger CPG messages for efficiency. -** TODO [#B] Genuine async completion -Replace current synchronous waiting implementation with genuine async completion. - -Test: enhance test_store.cpp to defer enqueueComplete till special message received. - -Async callback uses *requestIOProcessing* to queue action on IO thread. - -** TODO [#B] Async completion of accept when dequeue completes. -Interface is already there on broker::Message, just need to ensure -that store and cluster implementations call it appropriately. - -** TODO [#B] Replicate wiring. -From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. - -** TODO [#B] New members joining - first pass - -Re-use update code from old cluster but don't replicate sessions & -connections. - -Need to extend it to send cluster IDs with messages. - -Need to replicate the queue ownership data as part of the update. - -** TODO [#B] Persistence support. -InitialStatus protoocl etc. to support persistent start-up (existing code) - -Only one broker recovers from store, update to others. - -Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. - -** TODO [#B] Handle other ways that messages can leave a queue. - -Other ways (other than via a consumer) that messages are take off a queue. - -NOTE: Not controlled by queue lock, how to make them consistent? - -Target broker may not have all messages on other brokers for purge/destroy. -- Queue::move() - need to wait for lock? Replicate? -- Queue::get() - ??? -- Queue::purge() - replicate purge? or just delete what's on broker ? -- Queue::destroy() - messages to alternate exchange on all brokers.? - -Need to add callpoints & mcast messages to replicate these? - -** TODO [#B] Flow control for internal queues. - -Need to bound the size of internal queues: delivery and multicast. -- stop polling for read on client connections when we reach a bound. -- restart polling when we get back under it. - -That will stop local multicasting, we still have to deal with remote -multicasting (note existing cluster does not do this.) Something like: -- when over bounds multicast a flow-control event. -- on delivery of flow-control all members stop polling to read client connections -- when back under bounds send flow-control-end, all members resume -- if flow-controling member dies others resume - -** TODO [#B] Integration with transactions. -Do we want to replicate during transaction & replicate commit/rollback -or replicate only on commit? -No integration with DTX transactions. -** TODO [#B] Make new cluster work with replication exchange. -Possibly re-use some common logic. Replication exchange is like clustering -except over TCP. -** TODO [#B] Better concurrency, scalabiility on multi-cores. -Introduce PollableQueue of operations per broker queue. Queue up mcast -operations (enqueue, acquire, accept etc.) to be handled concurrently -on different queue. Performance testing to verify improved scalability. -** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. -Cluster needs to complete these asynchronously to guarantee resources -exist across the cluster when the command completes. - -** TODO [#C] Allow non-replicated exchanges, queues. - -Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. -- save replicated status to store. -- support in management tools. -Replicated exchange: replicate binds to replicated queues. -Replicated queue: replicate all messages. - -** TODO [#C] New members joining - improved. - -Replicate wiring like old cluster, stall for wiring but not for -messages. Update messages on a per-queue basis from back to front. - -Updater: -- stall & push wiring: declare exchanges, queues, bindings. -- start update iterator thread on each queue. -- unstall and process normally while iterator threads run. - -Update iterator thread: -- starts at back of updater queue, message m. -- send update_front(q,m) to updatee and advance towards front -- at front: send update_done(q) - -Updatee: -- stall, receive wiring, lock all queues, mark queues "updating", unstall -- update_front(q,m): push m to *front* of q -- update_done(q): mark queue "ready" - -Updatee cannot take the queue consume lock for a queue that is updating. -Updatee *can* push messages onto a queue that is updating. - -TODO: Is there any way to eliminate the stall for wiring? - -** TODO [#C] Refactoring of common concerns. - -There are a bunch of things that act as "Queue observers" with intercept -points in similar places. -- QueuePolicy -- QueuedEvents (async replication) -- MessageStore -- Cluster - -Look for ways to capitalize on the similarity & simplify the code. - -In particular QueuedEvents (async replication) strongly resembles -cluster replication, but over TCP rather than multicast. -** TODO [#C] Concurrency for enqueue events. -All enqueue events are being processed in the CPG deliver thread context which -serializes all the work. We only need ordering on a per queue basis, can we -enqueue in parallel on different queues and will that improve performance? -** TODO [#C] Handling immediate messages in a cluster -Include remote consumers in descision to deliver an immediate message? diff --git a/cpp/design_docs/new-cluster-design.txt b/cpp/design_docs/new-cluster-design.txt new file mode 100644 index 0000000000..7adb46fee3 --- /dev/null +++ b/cpp/design_docs/new-cluster-design.txt @@ -0,0 +1,435 @@ +-*-org-*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +* A new design for Qpid clustering. + +** Issues with current design. + +The cluster is based on virtual synchrony: each broker multicasts +events and the events from all brokers are serialized and delivered in +the same order to each broker. + +In the current design raw byte buffers from client connections are +multicast, serialized and delivered in the same order to each broker. + +Each broker has a replica of all queues, exchanges, bindings and also +all connections & sessions from every broker. Cluster code treats the +broker as a "black box", it "plays" the client data into the +connection objects and assumes that by giving the same input, each +broker will reach the same state. + +A new broker joining the cluster receives a snapshot of the current +cluster state, and then follows the multicast conversation. + +*** Maintenance issues. + +The entire state of each broker is replicated to every member: +connections, sessions, queues, messages, exchanges, management objects +etc. Any discrepancy in the state that affects how messages are +allocated to consumers can cause an inconsistency. + +- Entire broker state must be faithfully updated to new members. +- Management model also has to be replicated. +- All queues are replicated, can't have unreplicated queues (e.g. for management) + +Events that are not deterministically predictable from the client +input data stream can cause inconsistencies. In particular use of +timers/timestamps require cluster workarounds to synchronize. + +A member that encounters an error which is not encounted by all other +members is considered inconsistent and will shut itself down. Such +errors can come from any area of the broker code, e.g. different +ACL files can cause inconsistent errors. + +The following areas required workarounds to work in a cluster: + +- Timers/timestamps in broker code: management, heartbeats, TTL +- Security: cluster must replicate *after* decryption by security layer. +- Management: not initially included in the replicated model, source of many inconsistencies. + +It is very easy for someone adding a feature or fixing a bug in the +standalone broker to break the cluster by: +- adding new state that needs to be replicated in cluster updates. +- doing something in a timer or other non-connection thread. + +It's very hard to test for such breaks. We need a looser coupling +and a more explicitly defined interface between cluster and standalone +broker code. + +*** Performance issues. + +Virtual synchrony delivers all data from all clients in a single +stream to each broker. The cluster must play this data thru the full +broker code stack: connections, sessions etc. in a single thread +context in order to get identical behavior on each broker. The cluster +has a pipelined design to get some concurrency but this is a severe +limitation on scalability in multi-core hosts compared to the +standalone broker which processes each connection in a separate thread +context. + +** A new cluster design. + +Clearly defined interface between broker code and cluster plug-in. + +Replicate queue events rather than client data. +- Broker behavior only needs to match per-queue. +- Smaller amount of code (queue implementation) that must behave predictably. +- Events only need be serialized per-queue, allows concurrency between queues + +Use a moving queue ownership protocol to agree order of dequeues. +No longer relies on identical state and lock-step behavior to cause +identical dequeues on each broker. + +Each queue has an associated thread-context. Events for a queue are executed +in that queues context, in parallel with events for other queues. + +*** Requirements + +The cluster must provide these delivery guarantees: + +- client sends transfer: message must be replicated and not lost even if the local broker crashes. +- client acquires a message: message must not be delivered on another broker while acquired. +- client accepts message: message is forgotten, will never be delivered or re-queued by any broker. +- client releases message: message must be re-queued on cluster and not lost. +- client rejects message: message must be dead-lettered or discarded and forgotten. +- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. + +Each guarantee takes effect when the client receives a *completion* +for the associated command (transfer, acquire, reject, accept) + +*** Broker receiving messages + +On recieving a message transfer, in the connection thread we: +- multicast a message-received event. +- enqueue and complete the transfer when it is self-delivered. + +Other brokers enqueue the message when they recieve the message-received event. + +Enqueues are queued up with other queue operations to be executed in the +thread context associated with the queue. + +*** Broker sending messages: moving queue ownership + +Each queue is *owned* by at most one cluster broker at a time. Only +that broker may acquire or dequeue messages. The owner multicasts +notification of messages it acquires/dequeues to the cluster. +Periodically the owner hands over ownership to another interested +broker, providing time-shared access to the queue among all interested +brokers. + +We assume the same IO-driven dequeuing algorithm as the standalone +broker with one modification: queues can be "locked". A locked queue +is not available for dequeuing messages and will be skipped by the +output algorithm. + +At any given time only those queues owned by the local broker will be +unlocked. + +As messages are acquired/dequeued from unlocked queues by the IO threads +the broker multicasts acquire/dequeue events to the cluster. + +When an unlocked queue has no more consumers with credit, or when a +time limit expires, the broker relinquishes ownership by multicasting +a release-queue event, allowing another interested broker to take +ownership. + +*** Asynchronous completion of accept +### HERE +In acknowledged mode a message is not forgotten until it is accepted, +to allow for requeue on rejection or crash. The accept should not be +completed till the message has been forgotten. + +On receiving an accept the broker: +- dequeues the message from the local queue +- multicasts an "accept" event +- completes the accept asynchronously when the dequeue event is self delivered. + +NOTE: The message store does not currently implement asynchronous +completions of accept, this is a bug. + +** Inconsistent errors. + +The new design eliminates most sources of inconsistent errors +(connections, sessions, security, management etc.) The only points +where inconsistent errors can occur are at enqueue and dequeue (most +likely store-related errors.) + +The new design can use the exisiting error-handling protocol with one +major improvement: since brokers are no longer required to maintain +identical state they do not have to stall processing while an error is +being resolved. + +#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. + +** Updating new members + +When a new member (the updatee) joins a cluster it needs to be brought +up to date with the rest of the cluster. An existing member (the +updater) sends an "update". + +In the old cluster design the update is a snapshot of the entire +broker state. To ensure consistency of the snapshot both the updatee +and the updater "stall" at the start of the update, i.e. they stop +processing multicast events and queue them up for processing when the +update is complete. This creates a back-log of work to get through, +which leaves them lagging behind the rest of the cluster till they +catch up (which is not guaranteed to happen in a bounded time.) + +With the new cluster design only exchanges, queues, bindings and +messages need to be replicated. + +Update of wiring (exchanges, queues, bindings) is the same as current +design. + +Update of messages is different: +- per-queue rather than per-broker, separate queues can be updated in parallel. +- updates queues in reverse order to eliminate unbounded catch-up +- does not require updater & updatee to stall during update. + +Replication events, multicast to cluster: +- enqueue(q,m): message m pushed on back of queue q . +- acquire(q,m): mark m acquired +- dequeue(q,m): forget m. +Messages sent on update connection: +- update_front(q,m): during update, receiver pushes m to *front* of q +- update_done(q): during update, update of q is complete. + +Updater: +- when updatee joins set iterator i = q.end() +- while i != q.begin(): --i; send update_front(q,*i) to updatee +- send update_done(q) to updatee + +Updatee: +- q initially in locked state, can't dequeue locally. +- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) +- receive update_front(q,m): q.push_front(m) +- receive update_done(q): q can be unlocked for local dequeing. + +Benefits: +- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. +- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. +- During update consumers actually help by removing messages before they need to be updated. +- Needs no separate "work to do" queue, only the broker queues themselves. + +# TODO how can we recover from updater crashing before update complete? +# Clear queues that are not updated & send request for udpates on those queues? + +# TODO updatee may receive a dequeue for a message it has not yet seen, needs +# to hold on to that so it can drop the message when it is seen. +# Similar problem exists for wiring? + +** Cluster API + +The new cluster API is similar to the MessageStore interface. +(Initially I thought it would be an extension of the MessageStore interface, +but as the design develops it seems better to make it a separate interface.) + +The cluster interface captures these events: +- wiring changes: queue/exchange declare/bind +- message enqueued/acquired/released/rejected/dequeued. + +The cluster will require some extensions to the Queue: +- Queues can be "locked", locked queues are ignored by IO-driven output. +- Cluster must be able to apply queue events from the cluster to a queue. + These appear to fit into existing queue operations. + +** Maintainability + +This design gives us more robust code with a clear and explicit interfaces. + +The cluster depends on specific events clearly defined by an explicit +interface. Provided the semantics of this interface are not violated, +the cluster will not be broken by changes to broker code. + +The cluster no longer requires identical processing of the entire +broker stack on each broker. It is not affected by the details of how +the broker allocates messages. It is independent of the +protocol-specific state of connections and sessions and so is +protected from future protocol changes (e.g. AMQP 1.0) + +A number of specific ways the code will be simplified: +- drop code to replicate management model. +- drop timer workarounds for TTL, management, heartbeats. +- drop "cluster-safe assertions" in broker code. +- drop connections, sessions, management from cluster update. +- drop security workarounds: cluster code now operates after message decoding. +- drop connection tracking in cluster code. +- simper inconsistent-error handling code, no need to stall. + +** Performance + +The only way to verify the relative performance of the new design is +to prototype & profile. The following points suggest the new design +may scale/perform better: + +Some work moved from virtual synchrony thread to connection threads: +- All connection/session logic moves to connection thread. +- Exchange routing logic moves to connection thread. +- On local broker dequeueing is done in connection thread +- Local broker dequeue is IO driven as for a standalone broker. + +For queues with all consumers on a single node dequeue is all +IO-driven in connection thread. Pay for time-sharing only if queue has +consumers on multiple brokers. + +Doing work for different queues in parallel scales on multi-core boxes when +there are multiple queues. + +One difference works against performance, thre is an extra +encode/decode. The old design multicasts raw client data and decodes +it in the virtual synchrony thread. The new design would decode +messages in the connection thread, re-encode them for multicast, and +decode (on non-local brokers) in the virtual synchrony thread. There +is extra work here, but only in the *connection* thread: on a +multi-core machine this happens in parallel for every connection, so +it probably is not a bottleneck. There may be scope to optimize +decode/re-encode by re-using some of the original encoded data, this +could also benefit the stand-alone broker. + +** Asynchronous queue replication + +The existing "asynchronous queue replication" feature maintains a +passive backup passive backup of queues on a remote broker over a TCP +connection. + +The new cluster replication protocol could be re-used to implement +asynchronous queue replication: its just a special case where the +active broker is always the queue owner and the enqueue/dequeue +messages are sent over a TCP connection rather than multicast. + +The new update update mechanism could also work with 'asynchronous +queue replication', allowing such replication (over a TCP connection +on a WAN say) to be initiated after the queue had already been created +and been in use (one of the key missing features). + +** Increasing Concurrency and load sharing + +The current cluster is bottlenecked by processing everything in the +CPG deliver thread. By removing the need for identical operation on +each broker, we open up the possiblility of greater concurrency. + +Handling multicast enqueue, acquire, accpet, release etc: concurrency +per queue. Operatons on different queues can be done in different +threads. + +The new design does not force each broker to do all the work in the +CPG thread so spreading load across cluster members should give some +scale-up. + +** Misc outstanding issues & notes + +Replicating wiring +- Need async completion of wiring commands? +- qpid.sequence_counter: need extra work to support in new design, do we care? + +Cluster+persistence: +- finish async completion: dequeue completion for store & cluster +- cluster restart from store: clean stores *not* identical, pick 1, all others update. +- need to generate cluster ids for messages recovered from store. + +Live updates: we don't need to stall brokers during an update! +- update on queue-by-queue basis. +- updatee locks queues during update, no dequeue. +- update in reverse: don't update messages dequeued during update. +- updatee adds update messages at front (as normal), replicated messages at back. +- updater starts from back, sends "update done" when it hits front of queue. + +Flow control: need to throttle multicasting +1. bound the number of outstanding multicasts. +2. ensure the entire cluster keeps up, no unbounded "lag" +The existing design uses read-credit to solve 1., and does not solve 2. +New design should stop reading on all connections while flow control +condition exists? + +Can federation also be unified, at least in configuration? + +Consider queues (and exchanges?) as having "reliability" attributes: +- persistent: is the message stored on disk. +- backed-up (to another broker): active/passive async replication. +- replicated (to a cluster): active/active multicast replication to cluster. +- federated: federation link to a queue/exchange on another broker. + +"Reliability" seems right for the first 3 but not for federation, is +there a better term? + +Clustering and scalability: new design may give us the flexibility to +address scalability as part of cluster design. Think about +relationship to federation and "fragmented queues" idea. + +* Design debates/descisions + +** Active/active vs. active passive + +An active-active cluster can be used in an active-passive mode. In +this mode we would like the cluster to be as efficient as a strictly +active-passive implementation. + +An active/passive implementation allows some simplifications over active/active: +- drop Queue ownership and locking +- don't need to replicate message acquisition. +- can do immediate local enqueue and still guarantee order. + +Active/passive introduces a few extra requirements: +- Exactly one broker hast to take over if primary fails. +- Passive members must refuse client connections. +- On failover, clients must re-try all known addresses till they find the active member. + +Active/active benefits: +- A broker failure only affects the subset of clients connected to that broker. +- Clients can switch to any other broker on failover +- Backup brokers are immediately available on failover. +- Some load sharing: reading from client + multicast only done on direct node. + +Active/active drawbacks: +- Co-ordinating message acquisition may impact performance (not tested) +- Code may be more complex that active/passive. + +Active/passive benefits: +- Don't need message allocation strategy, can feed consumers at top speed. +- Code may be simpler than active/active. + +Active/passive drawbacks: +- All clients on one node so a failure affects every client in the system. +- After a failure there is a "reconnect storm" as every client reconnects to the new active node. +- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. +- Clients must find the single active node, may involve multiple connect attempts. + +** Total ordering. + +Initial thinking: allow message ordering to differ between brokers. +New thinking: use CPG total ordering, get identical ordering on all brokers. +- Allowing variation in order introduces too much chance of unexpected behavior. +- Usign total order allows other optimizations, see Message Identifiers below. + +** Message identifiers. + +Initial thinking: message ID = CPG node id + 64 bit sequence number. +This involves a lot of mapping between cluster IDs and broker messsages. + +New thinking: message ID = queue name + queue position. +- Removes most of the mapping and memory management for cluster code. +- Requires total ordering of messages (see above) + +** Message rejection + +Initial thinking: add special reject/rejected points to cluster interface so +rejected messages could be re-queued without multicast. + +New thinking: treat re-queueing after reject as entirely new message. +- Simplifies cluster interface & implementation +- Not on the critical path. diff --git a/cpp/design_docs/new-cluster-plan.txt b/cpp/design_docs/new-cluster-plan.txt new file mode 100644 index 0000000000..b5cd353fc0 --- /dev/null +++ b/cpp/design_docs/new-cluster-plan.txt @@ -0,0 +1,473 @@ +-*-org-*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +Notes on new cluster implementation. See also: new-cluster-design.txt + +* Implementation plan. + +Co-existence with old cluster code and tests: +- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. +- Double up tests with old version/new version as the new code develops. + +Minimal POC for message delivery & perf test. +- no wiring replication, no updates, no failover, no persistence, no async completion. +- just implement publish and acquire/dequeue locking protocol. +- optimize the special case where all consumers are on the same node. +- measure performance: compare active-passive and active-active modes of use. + +Full implementation of transient cluster +- Update (based on existing update), async completion etc. +- Passing all existing transient cluster tests. + +Persistent cluster +- Make sure async completion works correctly. +- InitialStatus protoocl etc. to support persistent start-up (existing code) +- cluster restart from store: stores not identical. Load one, update the rest. + - assign cluster ID's to messages recovered from store, don't replicate. + +Improved update protocol +- per-queue, less stalling, bounded catch-up. + +* Task list + +** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. + +NOTE: as implementation questions arise, take the easiest option and make +a note for later optimization/improvement. + +*** Tests +- python test: 4 senders, numbered messages, 4 receivers, verify message set. +- acquire then release messages: verify can be dequeued on any member +- acquire then kill broker: verify can be dequeued other members. +- acquire then reject: verify goes on alt-exchange once only. + +*** DONE broker::Cluster interface and call points. + +Initial interface commited. + +*** Main classes + +BrokerHandler: +- implements broker::Cluster intercept points. +- sends mcast events to inform cluster of local actions. +- thread safe, called in connection threads. + +LocalMessageMap: +- Holds local messages while they are being enqueued. +- thread safe: called by both BrokerHandler and MessageHandler + +MessageHandler: +- handles delivered mcast messages related to messages. +- initiates local actions in response to mcast events. +- thread unsafe, only called in deliver thread. +- maintains view of cluster state regarding messages. + +QueueOwnerHandler: +- handles delivered mcast messages related to queue consumer ownership. +- thread safe, called in deliver, connection and timer threads. +- maintains view of cluster state regarding queue ownership. + +cluster::Core: class to hold new cluster together (replaces cluster::Cluster) +- thread safe: manage state used by both MessageHandler and BrokerHandler + +The following code sketch illustrates only the "happy path" error handling +is omitted. + +*** BrokerHandler +Types: +- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; } +- struct + +NOTE: +- Messages on queues are identified by a queue name + a position. +- Messages being routed are identified by a sequence number. + +Members: +- thread_local bool noReplicate // suppress replication. +- thread_local bool isRouting // suppress operations while routing +- Message localMessage[SequenceNumber] // local messages being routed. +- thread_local SequenceNumber routingSequence + +NOTE: localMessage is also modified by MessageHandler. + +broker::Cluster intercept functions: + +routing(msg) + if noReplicate: return + # Supress everything except enqueues while we are routing. + # We don't want to replicate acquires & dequeues caused by an enqueu, + # e.g. removal of messages from ring/LV queues. + isRouting = true + +enqueue(qmsg): + if noReplicate: return + if routingSequence == 0 # thread local + routingSequence = nextRoutingSequence() + mcast create(encode(qmsg.msg),routingSeq) + mcast enqueue(qmsg.q,routingSeq) + +routed(msg): + if noReplicate: return + isRouting = false + +acquire(qmsg): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast acquire(qmsg) + +release(QueuedMessage) + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + mcast release(qmsg) + +accept(QueuedMessage): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + mcast accept(qmsg) + +reject(QueuedMessage): + isRejecting = true + mcast reject(qmsg) + +# FIXME no longer needed? +drop(QueuedMessage) + cleanup(qmsg) + +*** MessageHandler and mcast messages +Types: +- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } +- struct QueueKey { MessageId id; QueueName q; } +- typedef map Queue +- struct Node { Message routing[SequenceNumber]; list acquired; } + +Members: +- QueueEntry enqueued[QueueKey] +- Node node[NodeId] + +Mcast messages in Message class: + +create(msg,seq) + if sender != self: node[sender].routing[seq] = decode(msg) + +enqueue(q,seq): + id = (sender,seq) + if sender == self: + enqueued[id,q] = (localMessage[seq], acquired=None) + else: + msg = sender.routing[seq] + enqueued[id,q] = (qmsg, acquired=None) + with noReplicate=true: qmsg = broker.getQueue(q).push(msg) + +routed(seq): + if sender == self: localMessage.erase(msg.id.seq) + else: sender.routing.erase(seq) + +acquire(id,q): + enqueued[id,q].acquired = sender + node[sender].acquired.push_back((id,q)) + if sender != self: + with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) + +release(id,q) + enqueued[id,q].acquired = None + node[sender].acquired.erase((id,q)) + if sender != self + with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) + +reject(id,q): + sender.routing[id] = enqueued[id,q] # prepare for re-queueing + +rejected(id,q) + sender.routing.erase[id] + +dequeue(id,q) + entry = enqueued[id,q] + enqueued.erase[id,q] + node[entry.acquired].acquired.erase(id,q) + if sender != self: + with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) + +member m leaves cluster: + for key in node[m].acquired: + release(key.id, key.q) + node.erase(m) + +*** Queue consumer locking + +When a queue is locked it does not deliver messages to its consumers. + +New broker::Queue functions: +- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. +- startConsumers(): reset consumersStopped flag + +Implementation sketch, locking omitted: + +void Queue::stopConsumers() { + consumersStopped = true; + while (consumersBusy) consumersBusyMonitor.wait(); +} + +void Queue::startConsumers() { + consumersStopped = false; + listeners.notify(); +} + +bool Queue::dispatch(consumer) { + if (consumersStopped) return false; + ++consumersBusy; + do_regular_dispatch_body() + if (--consumersBusy == 0) consumersBusyMonitor.notify(); +} + +*** QueueOwnerHandler + +Invariants: +- Each queue is owned by at most one node at any time. +- Each node is interested in a set of queues at any given time. +- A queue is un-owned if no node is interested. + +The queue owner releases the queue when +- it loses interest i.e. queue has no consumers with credit. +- a configured time delay expires and there are other interested nodes. + +The owner mcasts release(q). On delivery the new queue owner is the +next node in node-id order (treating nodes as a circular list) +starting from the old owner that is interested in the queue. + +Queue consumers initially are stopped, only started when we get +ownership from the cluster. + +Thread safety: called by deliver, connection and timer threads, needs locking. + +Thread safe object per queue holding queue ownership status. +Called by deliver, connection and timer threads. + +class QueueOwnership { + bool owned; + Timer timer; + BrokerQueue q; + + drop(): # locked + if owned: + owned = false + q.stopConsumers() + mcast release(q.name, false) + timer.stop() + + take(): # locked + if not owned: + owned = true + q.startConsumers() + timer.start(timeout) + + timer.fire(): drop() +} + +Data Members, only modified/examined in deliver thread: +- typedef set ConsumerSet +- map consumers +- map owner + +Thread safe data members, accessed in connection threads (via BrokerHandler): +- map ownership + +Multicast messages in QueueOwner class: + +consume(q): + if sender==self and consumers[q].empty(): ownership[q].take() + consumers[q].insert(sender) + +release(q): + asssert(owner[q] == sender and owner[q] in consumers[q]) + owner[q] = circular search from sender in consumers[q] + if owner==self: ownership[q].take() + +cancel(q): + assert(queue[q].owner != sender) # sender must release() before cancel() + consumers[q].erase(sender) + +member-leaves: + for q in queue: if owner[q] = left: left.release(q) + +Need 2 more intercept points in broker::Cluster: + +consume(q,consumer,consumerCount) - Queue::consume() + if consumerCount == 1: mcast consume(q) + +cancel(q,consumer,consumerCount) - Queue::cancel() + if consumerCount == 0: + ownership[q].drop() + mcast cancel(q) + +#TODO: lifecycle, updating cluster data structures when queues are destroyed + +*** Increasing concurrency +The major performance limitation of the old cluster is that it does +everything in the single CPG deliver thread context. + +We can get additional concurrency by creating a thread context _per queue_ +for queue operations: enqueue, acquire, accept etc. + +We associate a PollableQueue of queue operations with each AMQP queue. +The CPG deliver thread would +- build messages and associate with cluster IDs. +- push queue ops to the appropriate PollableQueue to be dispatched the queues thread. + +Serializing operations on the same queue avoids contention, but takes advantage +of the independence of operations on separate queues. + +*** Re-use of existing cluster code +- re-use Event +- re-use Multicaster +- re-use same PollableQueueSetup (may experiment later) +- new Core class to replace Cluster. +- keep design modular, keep threading rules clear. + +** TODO [#B] Large message replication. +Multicast should encode messages in fixed size buffers (64k)? +Can't assume we can send message in one chunk. +For 0-10 can use channel numbers & send whole frames packed into larger buffer. +** TODO [#B] Batch CPG multicast messages +The new cluster design involves a lot of small multicast messages, +they need to be batched into larger CPG messages for efficiency. +** TODO [#B] Genuine async completion +Replace current synchronous waiting implementation with genuine async completion. + +Test: enhance test_store.cpp to defer enqueueComplete till special message received. + +Async callback uses *requestIOProcessing* to queue action on IO thread. + +** TODO [#B] Async completion of accept when dequeue completes. +Interface is already there on broker::Message, just need to ensure +that store and cluster implementations call it appropriately. + +** TODO [#B] Replicate wiring. +From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. + +** TODO [#B] New members joining - first pass + +Re-use update code from old cluster but don't replicate sessions & +connections. + +Need to extend it to send cluster IDs with messages. + +Need to replicate the queue ownership data as part of the update. + +** TODO [#B] Persistence support. +InitialStatus protoocl etc. to support persistent start-up (existing code) + +Only one broker recovers from store, update to others. + +Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. + +** TODO [#B] Handle other ways that messages can leave a queue. + +Other ways (other than via a consumer) that messages are take off a queue. + +NOTE: Not controlled by queue lock, how to make them consistent? + +Target broker may not have all messages on other brokers for purge/destroy. +- Queue::move() - need to wait for lock? Replicate? +- Queue::get() - ??? +- Queue::purge() - replicate purge? or just delete what's on broker ? +- Queue::destroy() - messages to alternate exchange on all brokers.? + +Need to add callpoints & mcast messages to replicate these? + +** TODO [#B] Flow control for internal queues. + +Need to bound the size of internal queues: delivery and multicast. +- stop polling for read on client connections when we reach a bound. +- restart polling when we get back under it. + +That will stop local multicasting, we still have to deal with remote +multicasting (note existing cluster does not do this.) Something like: +- when over bounds multicast a flow-control event. +- on delivery of flow-control all members stop polling to read client connections +- when back under bounds send flow-control-end, all members resume +- if flow-controling member dies others resume + +** TODO [#B] Integration with transactions. +Do we want to replicate during transaction & replicate commit/rollback +or replicate only on commit? +No integration with DTX transactions. +** TODO [#B] Make new cluster work with replication exchange. +Possibly re-use some common logic. Replication exchange is like clustering +except over TCP. +** TODO [#B] Better concurrency, scalabiility on multi-cores. +Introduce PollableQueue of operations per broker queue. Queue up mcast +operations (enqueue, acquire, accept etc.) to be handled concurrently +on different queue. Performance testing to verify improved scalability. +** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. +Cluster needs to complete these asynchronously to guarantee resources +exist across the cluster when the command completes. + +** TODO [#C] Allow non-replicated exchanges, queues. + +Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. +- save replicated status to store. +- support in management tools. +Replicated exchange: replicate binds to replicated queues. +Replicated queue: replicate all messages. + +** TODO [#C] New members joining - improved. + +Replicate wiring like old cluster, stall for wiring but not for +messages. Update messages on a per-queue basis from back to front. + +Updater: +- stall & push wiring: declare exchanges, queues, bindings. +- start update iterator thread on each queue. +- unstall and process normally while iterator threads run. + +Update iterator thread: +- starts at back of updater queue, message m. +- send update_front(q,m) to updatee and advance towards front +- at front: send update_done(q) + +Updatee: +- stall, receive wiring, lock all queues, mark queues "updating", unstall +- update_front(q,m): push m to *front* of q +- update_done(q): mark queue "ready" + +Updatee cannot take the queue consume lock for a queue that is updating. +Updatee *can* push messages onto a queue that is updating. + +TODO: Is there any way to eliminate the stall for wiring? + +** TODO [#C] Refactoring of common concerns. + +There are a bunch of things that act as "Queue observers" with intercept +points in similar places. +- QueuePolicy +- QueuedEvents (async replication) +- MessageStore +- Cluster + +Look for ways to capitalize on the similarity & simplify the code. + +In particular QueuedEvents (async replication) strongly resembles +cluster replication, but over TCP rather than multicast. +** TODO [#C] Concurrency for enqueue events. +All enqueue events are being processed in the CPG deliver thread context which +serializes all the work. We only need ordering on a per queue basis, can we +enqueue in parallel on different queues and will that improve performance? +** TODO [#C] Handling immediate messages in a cluster +Include remote consumers in descision to deliver an immediate message? -- cgit v1.2.1