diff options
| author | Alan Conway <aconway@apache.org> | 2010-10-18 19:36:13 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-10-18 19:36:13 +0000 |
| commit | a08d54e27d4e91b52c5979cc566ab3e933878983 (patch) | |
| tree | 7f57ad88051e4a02f52d4bdf395968549e24f57a /cpp/src/qpid/cluster | |
| parent | 8e53bc375ef2bfb4b05cc32b4a8c0042d95b9ec2 (diff) | |
| download | qpid-python-a08d54e27d4e91b52c5979cc566ab3e933878983.tar.gz | |
Introduce broker::Cluster interface.
See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt.
qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies
the broker makes the expected calls on broker::Cluster in various situations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1023966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/new-cluster-design.txt | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/new-cluster-plan.txt | 439 |
2 files changed, 442 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/new-cluster-design.txt b/cpp/src/qpid/cluster/new-cluster-design.txt index 392de890c3..8ee740372d 100644 --- a/cpp/src/qpid/cluster/new-cluster-design.txt +++ b/cpp/src/qpid/cluster/new-cluster-design.txt @@ -75,6 +75,8 @@ Use a moving queue ownership protocol to agree order of dequeues, rather than relying on identical state and lock-step behavior to cause identical dequeues on each broker. +Clearly defined interface between broker code and cluster plug-in. + *** Requirements The cluster must provide these delivery guarantees: @@ -365,3 +367,4 @@ there a better term? Clustering and scalability: new design may give us the flexibility to address scalability as part of cluster design. Think about relationship to federation and "fragmented queues" idea. + diff --git a/cpp/src/qpid/cluster/new-cluster-plan.txt b/cpp/src/qpid/cluster/new-cluster-plan.txt new file mode 100644 index 0000000000..57c1241607 --- /dev/null +++ b/cpp/src/qpid/cluster/new-cluster-plan.txt @@ -0,0 +1,439 @@ +-*-org-*- +Notes on new cluster implementation. See also: new-cluster-design.txt + +* Implementation plan. + +Co-existence with old cluster code and tests: +- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster. +- Double up tests with old version/new version as the new code develops. + +Minimal POC for message delivery & perf test. +- no wiring replication, no updates, no failover, no persistence, no async completion. +- just implement publish and acquire/dequeue locking protocol. +- measure performance. + +Full implementation of transient cluster +- Update (based on existing update), async completion etc. +- Passing all existing transient cluster tests. + +Persistent cluster +- Make sure async completion works correctly. +- InitialStatus protoocl etc. to support persistent start-up (existing code) +- cluster restart from store: stores not identical. Load one, update the rest. + - assign cluster ID's to messages recovered from store, don't replicate. + +Improved update protocol +- per-queue, less stalling, bounded catch-up. + +* Task list + +** TODO [#A] Minimal POC: publish/acquire/dequeue protocol. + +NOTE: as implementation questions arise, take the easiest option and make +a note for later optimization/improvement. + +*** Tests +- python test: 4 senders, numbered messages, 4 receivers, verify message set. +- acquire then release messages: verify can be dequeued on any member +- acquire then kill broker: verify can be dequeued other members. +- acquire then reject: verify goes on alt-exchange once only. + +*** TODO broker::Cluster interface and call points. + +Initial draft is commited. + +Issues to review: + +queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue +when messages are pushed. How to reconcile with queue ownership? + +rejecting messages: if there's an alternate exchange where do we do the +re-routing? On origin broker or on all brokers? + +Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc. +Intercepting client actions on the queue vs. internal actions +(e.g. ring policy) + +*** Main classes + +BrokerHandler: +- implements broker::Cluster intercept points. +- sends mcast events to inform cluster of local actions. +- thread safe, called in connection threads. + +LocalMessageMap: +- Holds local messages while they are being enqueued. +- thread safe: called by both BrokerHandler and DeliverHandler + +MessageHandler: +- handles delivered mcast messages related to messages. +- initiates local actions in response to mcast events. +- thread unsafe, only called in deliver thread. +- maintains view of cluster state regarding messages. + +QueueOwnerHandler: +- handles delivered mcast messages related to queue consumer ownership. +- thread safe, called in deliver, connection and timer threads. +- maintains view of cluster state regarding queue ownership. + +cluster::Core: class to hold new cluster together (replaces cluster::Cluster) +- thread safe: manage state used by both DeliverHandler and BrokerHandler + +The following code sketch illustrates only the "happy path" error handling +is omitted. + +*** BrokerHandler +Types: +- struct QueuedMessage { Message msg; QueueName q; Position pos; } +- SequenceNumber 64 bit sequence number to identify messages. +- NodeId 64 bit CPG node-id, identifies member of the cluster. +- struct MessageId { NodeId node; SequenceNumber seq; } + +Members: +- atomic<SequenceNumber> sequence // sequence number for message IDs. +- thread_local bool noReplicate // suppress replication. +- thread_local bool isRouting // suppress operations while routing +- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued. + +NOTE: localMessage is also modified by DeliverHandler. + +broker::Cluster intercept functions: + +routing(msg) + if noReplicate: return + # Supress everything except enqueues while we are routing. + # We don't want to replicate acquires & dequeues caused by an enqueu, + # e.g. removal of messages from ring/LV queues. + isRouting = true + +enqueue(qmsg): + if noReplicate: return + if !qmsg.msg.id: + seq = sequence++ + qmsg.msg.id = (self,seq) + localMessage[seq] = qmsg + mcast create(encode(qmsg.msg),seq) + mcast enqueue(qmsg.q,qmsg.msg.id.seq) + +routed(msg): + if noReplicate: return + if msg.id: mcast routed(msg.id.seq) + isRouting = false + +acquire(qmsg): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast acquire(msg.id, q) + +release(QueuedMessage) + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast release(id, q) + +accept(QueuedMessage): + if noReplicate: return + if isRouting: return # Ignore while we are routing a message. + if msg.id: mcast dequeue(msg.id, msg.q) + +reject(QueuedMessage): + isRejecting = true + if msg.id: mcast reject(msg.id, msg.q) + +rejected(QueuedMessage): + isRejecting = false + mcast dequeue(msg.id, msg.q) + +dequeue(QueuedMessage) + # No mcast in dequeue, only used for local cleanup of resources. + # E.g. messages that are replaced on an LVQ are dequeued without being + # accepted or rejected. dequeue is called with the queue lock held + # FIXME revisit - move it out of the queue lock. + cleanup(msg) + +*** DeliverHandler and mcast messages +Types: +- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; } +- struct QueueKey { MessageId id; QueueName q; } +- typedef map<QueueKey, QueueEntry> Queue +- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; } + +Members: +- QueueEntry enqueued[QueueKey] +- Node node[NodeId] + +Mcast messages in Message class: + +create(msg,seq) + if sender != self: node[sender].routing[seq] = decode(msg) + +enqueue(q,seq): + id = (sender,seq) + if sender == self: + enqueued[id,q] = (localMessage[seq], acquired=None) + else: + msg = sender.routing[seq] + enqueued[id,q] = (qmsg, acquired=None) + with noReplicate=true: qmsg = broker.getQueue(q).push(msg) + +routed(seq): + if sender == self: localMessage.erase(msg.id.seq) + else: sender.routing.erase(seq) + +acquire(id,q): + enqueued[id,q].acquired = sender + node[sender].acquired.push_back((id,q)) + if sender != self: + with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q]) + +release(id,q) + enqueued[id,q].acquired = None + node[sender].acquired.erase((id,q)) + if sender != self + with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q]) + +reject(id,q): + sender.routing[id] = enqueued[id,q] # prepare for re-queueing + +rejected(id,q) + sender.routing.erase[id] + +dequeue(id,q) + entry = enqueued[id,q] + enqueued.erase[id,q] + node[entry.acquired].acquired.erase(id,q) + if sender != self: + with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg) + +member m leaves cluster: + for key in node[m].acquired: + release(key.id, key.q) + node.erase(m) + +*** Queue consumer locking + +When a queue is locked it does not deliver messages to its consumers. + +New broker::Queue functions: +- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit. +- startConsumers(): reset consumersStopped flag + +Implementation sketch, locking omitted: + +void Queue::stopConsumers() { + consumersStopped = true; + while (consumersBusy) consumersBusyMonitor.wait(); +} + +void Queue::startConsumers() { + consumersStopped = false; + listeners.notify(); +} + +bool Queue::dispatch(consumer) { + if (consumersStopped) return false; + ++consumersBusy; + do_regular_dispatch_body() + if (--consumersBusy == 0) consumersBusyMonitor.notify(); +} + +*** QueueOwnerHandler + +Invariants: +- Each queue is owned by at most one node at any time. +- Each node is interested in a set of queues at any given time. +- A queue is un-owned if no node is interested. + +The queue owner releases the queue when +- it loses interest i.e. queue has no consumers with credit. +- a configured time delay expires and there are other interested nodes. + +The owner mcasts release(q). On delivery the new queue owner is the +next node in node-id order (treating nodes as a circular list) +starting from the old owner that is interested in the queue. + +Queue consumers initially are stopped, only started when we get +ownership from the cluster. + +Thread safety: called by deliver, connection and timer threads, needs locking. + +Thread safe object per queue holding queue ownership status. +Called by deliver, connection and timer threads. + +class QueueOwnership { + bool owned; + Timer timer; + BrokerQueue q; + + drop(): # locked + if owned: + owned = false + q.stopConsumers() + mcast release(q.name, false) + timer.stop() + + take(): # locked + if not owned: + owned = true + q.startConsumers() + timer.start(timeout) + + timer.fire(): drop() +} + +Data Members, only modified/examined in deliver thread: +- typedef set<NodeId> ConsumerSet +- map<QueueName, ConsumerSet> consumers +- map<QueueName, NodeId> owner + +Thread safe data members, accessed in connection threads (via BrokerHandler): +- map<QueueName, QueueOwnership> ownership + +Multicast messages in QueueOwner class: + +consume(q): + if sender==self and consumers[q].empty(): ownership[q].take() + consumers[q].insert(sender) + +release(q): + asssert(owner[q] == sender and owner[q] in consumers[q]) + owner[q] = circular search from sender in consumers[q] + if owner==self: ownership[q].take() + +cancel(q): + assert(queue[q].owner != sender) # sender must release() before cancel() + consumers[q].erase(sender) + +member-leaves: + for q in queue: if owner[q] = left: left.release(q) + +Need 2 more intercept points in broker::Cluster: + +consume(q,consumer,consumerCount) - Queue::consume() + if consumerCount == 1: mcast consume(q) + +cancel(q,consumer,consumerCount) - Queue::cancel() + if consumerCount == 0: + ownership[q].drop() + mcast cancel(q) + +#TODO: lifecycle, updating cluster data structures when queues are destroyed + +*** Re-use of existing cluster code +- re-use Event +- re-use Multicaster +- re-use same PollableQueueSetup (may experiment later) +- new Core class to replace Cluster. +- keep design modular, keep threading rules clear. + +** TODO [#B] Large message replication. +Need to be able to multicast large messages in fragments + +** TODO [#B] Batch CPG multicast messages +The new cluster design involves a lot of small multicast messages, +they need to be batched into larger CPG messages for efficiency. +** TODO [#B] Genuine async completion +Replace current synchronous waiting implementation with genuine async completion. + +Test: enhance test_store.cpp to defer enqueueComplete till special message received. + +Async callback uses *requestIOProcessing* to queue action on IO thread. + +** TODO [#B] Async completion of accept when dequeue completes. +Interface is already there on broker::Message, just need to ensure +that store and cluster implementations call it appropriately. + +** TODO [#B] Replicate wiring. +From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command. + +** TODO [#B] New members joining - first pass + +Re-use update code from old cluster but don't replicate sessions & +connections. + +Need to extend it to send cluster IDs with messages. + +Need to replicate the queue ownership data as part of the update. + +** TODO [#B] Persistence support. +InitialStatus protoocl etc. to support persistent start-up (existing code) + +Only one broker recovers from store, update to others. + +Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover. + +** TODO [#B] Handle other ways that messages can leave a queue. + +Other ways (other than via a consumer) that messages are take off a queue. + +NOTE: Not controlled by queue lock, how to make them consistent? + +Target broker may not have all messages on other brokers for purge/destroy. +- Queue::move() - need to wait for lock? Replicate? +- Queue::get() - ??? +- Queue::purge() - replicate purge? or just delete what's on broker ? +- Queue::destroy() - messages to alternate exchange on all brokers.? + +Need to add callpoints & mcast messages to replicate these? + +** TODO [#B] Flow control for internal queues. + +Need to bound the size of the internal queues holding cluster events & frames. +- stop polling when we reach bound. +- start polling when we get back under it. +** TODO [#B] Integration with transactions. +Do we want to replicate during transaction & replicate commit/rollback +or replicate only on commit? +No integration with DTX transactions. +** TODO [#B] Make new cluster work with replication exchange. +Possibly re-use some common logic. Replication exchange is like clustering +except over TCP. +** TODO [#C] Async completion for declare, bind, destroy queues and exchanges. +Cluster needs to complete these asynchronously to guarantee resources +exist across the cluster when the command completes. + +** TODO [#C] Allow non-replicated exchanges, queues. + +Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects. +- save replicated status to store. +- support in management tools. +Replicated exchange: replicate binds to replicated queues. +Replicated queue: replicate all messages. + +** TODO [#C] New members joining - improved. + +Replicate wiring like old cluster, stall for wiring but not for +messages. Update messages on a per-queue basis from back to front. + +Updater: +- stall & push wiring: declare exchanges, queues, bindings. +- start update iterator thread on each queue. +- unstall and process normally while iterator threads run. + +Update iterator thread: +- starts at back of updater queue, message m. +- send update_front(q,m) to updatee and advance towards front +- at front: send update_done(q) + +Updatee: +- stall, receive wiring, lock all queues, mark queues "updating", unstall +- update_front(q,m): push m to *front* of q +- update_done(q): mark queue "ready" + +Updatee cannot take the queue consume lock for a queue that is updating. +Updatee *can* push messages onto a queue that is updating. + +TODO: Is there any way to eliminate the stall for wiring? + +** TODO [#C] Refactoring of common concerns. + +There are a bunch of things that act as "Queue observers" with intercept +points in similar places. +- QueuePolicy +- QueuedEvents (async replication) +- MessageStore +- Cluster + +Look for ways to capitalize on the similarity & simplify the code. + +In particular QueuedEvents (async replication) strongly resembles +cluster replication, but over TCP rather than multicast. |
