summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-18 19:36:13 +0000
committerAlan Conway <aconway@apache.org>2010-10-18 19:36:13 +0000
commita08d54e27d4e91b52c5979cc566ab3e933878983 (patch)
tree7f57ad88051e4a02f52d4bdf395968549e24f57a /cpp/src/qpid/cluster
parent8e53bc375ef2bfb4b05cc32b4a8c0042d95b9ec2 (diff)
downloadqpid-python-a08d54e27d4e91b52c5979cc566ab3e933878983.tar.gz
Introduce broker::Cluster interface.
See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt. qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies the broker makes the expected calls on broker::Cluster in various situations. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1023966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/new-cluster-design.txt3
-rw-r--r--cpp/src/qpid/cluster/new-cluster-plan.txt439
2 files changed, 442 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/new-cluster-design.txt b/cpp/src/qpid/cluster/new-cluster-design.txt
index 392de890c3..8ee740372d 100644
--- a/cpp/src/qpid/cluster/new-cluster-design.txt
+++ b/cpp/src/qpid/cluster/new-cluster-design.txt
@@ -75,6 +75,8 @@ Use a moving queue ownership protocol to agree order of dequeues, rather
than relying on identical state and lock-step behavior to cause
identical dequeues on each broker.
+Clearly defined interface between broker code and cluster plug-in.
+
*** Requirements
The cluster must provide these delivery guarantees:
@@ -365,3 +367,4 @@ there a better term?
Clustering and scalability: new design may give us the flexibility to
address scalability as part of cluster design. Think about
relationship to federation and "fragmented queues" idea.
+
diff --git a/cpp/src/qpid/cluster/new-cluster-plan.txt b/cpp/src/qpid/cluster/new-cluster-plan.txt
new file mode 100644
index 0000000000..57c1241607
--- /dev/null
+++ b/cpp/src/qpid/cluster/new-cluster-plan.txt
@@ -0,0 +1,439 @@
+-*-org-*-
+Notes on new cluster implementation. See also: new-cluster-design.txt
+
+* Implementation plan.
+
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
+
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- measure performance.
+
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
+
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
+
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
+
+* Task list
+
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
+
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
+
+*** TODO broker::Cluster interface and call points.
+
+Initial draft is commited.
+
+Issues to review:
+
+queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
+when messages are pushed. How to reconcile with queue ownership?
+
+rejecting messages: if there's an alternate exchange where do we do the
+re-routing? On origin broker or on all brokers?
+
+Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
+Intercepting client actions on the queue vs. internal actions
+(e.g. ring policy)
+
+*** Main classes
+
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
+
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and DeliverHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
+
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both DeliverHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; Position pos; }
+- SequenceNumber 64 bit sequence number to identify messages.
+- NodeId 64 bit CPG node-id, identifies member of the cluster.
+- struct MessageId { NodeId node; SequenceNumber seq; }
+
+Members:
+- atomic<SequenceNumber> sequence // sequence number for message IDs.
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
+
+NOTE: localMessage is also modified by DeliverHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+ if noReplicate: return
+ # Supress everything except enqueues while we are routing.
+ # We don't want to replicate acquires & dequeues caused by an enqueu,
+ # e.g. removal of messages from ring/LV queues.
+ isRouting = true
+
+enqueue(qmsg):
+ if noReplicate: return
+ if !qmsg.msg.id:
+ seq = sequence++
+ qmsg.msg.id = (self,seq)
+ localMessage[seq] = qmsg
+ mcast create(encode(qmsg.msg),seq)
+ mcast enqueue(qmsg.q,qmsg.msg.id.seq)
+
+routed(msg):
+ if noReplicate: return
+ if msg.id: mcast routed(msg.id.seq)
+ isRouting = false
+
+acquire(qmsg):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast acquire(msg.id, q)
+
+release(QueuedMessage)
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast release(id, q)
+
+accept(QueuedMessage):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast dequeue(msg.id, msg.q)
+
+reject(QueuedMessage):
+ isRejecting = true
+ if msg.id: mcast reject(msg.id, msg.q)
+
+rejected(QueuedMessage):
+ isRejecting = false
+ mcast dequeue(msg.id, msg.q)
+
+dequeue(QueuedMessage)
+ # No mcast in dequeue, only used for local cleanup of resources.
+ # E.g. messages that are replaced on an LVQ are dequeued without being
+ # accepted or rejected. dequeue is called with the queue lock held
+ # FIXME revisit - move it out of the queue lock.
+ cleanup(msg)
+
+*** DeliverHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+ if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+ id = (sender,seq)
+ if sender == self:
+ enqueued[id,q] = (localMessage[seq], acquired=None)
+ else:
+ msg = sender.routing[seq]
+ enqueued[id,q] = (qmsg, acquired=None)
+ with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+ if sender == self: localMessage.erase(msg.id.seq)
+ else: sender.routing.erase(seq)
+
+acquire(id,q):
+ enqueued[id,q].acquired = sender
+ node[sender].acquired.push_back((id,q))
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+ enqueued[id,q].acquired = None
+ node[sender].acquired.erase((id,q))
+ if sender != self
+ with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+ sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+ sender.routing.erase[id]
+
+dequeue(id,q)
+ entry = enqueued[id,q]
+ enqueued.erase[id,q]
+ node[entry.acquired].acquired.erase(id,q)
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+ for key in node[m].acquired:
+ release(key.id, key.q)
+ node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+ consumersStopped = true;
+ while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+ consumersStopped = false;
+ listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+ if (consumersStopped) return false;
+ ++consumersBusy;
+ do_regular_dispatch_body()
+ if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+ bool owned;
+ Timer timer;
+ BrokerQueue q;
+
+ drop(): # locked
+ if owned:
+ owned = false
+ q.stopConsumers()
+ mcast release(q.name, false)
+ timer.stop()
+
+ take(): # locked
+ if not owned:
+ owned = true
+ q.startConsumers()
+ timer.start(timeout)
+
+ timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
+
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
+
+Multicast messages in QueueOwner class:
+
+consume(q):
+ if sender==self and consumers[q].empty(): ownership[q].take()
+ consumers[q].insert(sender)
+
+release(q):
+ asssert(owner[q] == sender and owner[q] in consumers[q])
+ owner[q] = circular search from sender in consumers[q]
+ if owner==self: ownership[q].take()
+
+cancel(q):
+ assert(queue[q].owner != sender) # sender must release() before cancel()
+ consumers[q].erase(sender)
+
+member-leaves:
+ for q in queue: if owner[q] = left: left.release(q)
+
+Need 2 more intercept points in broker::Cluster:
+
+consume(q,consumer,consumerCount) - Queue::consume()
+ if consumerCount == 1: mcast consume(q)
+
+cancel(q,consumer,consumerCount) - Queue::cancel()
+ if consumerCount == 0:
+ ownership[q].drop()
+ mcast cancel(q)
+
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
+
+** TODO [#B] Large message replication.
+Need to be able to multicast large messages in fragments
+
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
+
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+
+Async callback uses *requestIOProcessing* to queue action on IO thread.
+
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
+
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+
+** TODO [#B] New members joining - first pass
+
+Re-use update code from old cluster but don't replicate sessions &
+connections.
+
+Need to extend it to send cluster IDs with messages.
+
+Need to replicate the queue ownership data as part of the update.
+
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
+
+Only one broker recovers from store, update to others.
+
+Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
+
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
+Target broker may not have all messages on other brokers for purge/destroy.
+- Queue::move() - need to wait for lock? Replicate?
+- Queue::get() - ???
+- Queue::purge() - replicate purge? or just delete what's on broker ?
+- Queue::destroy() - messages to alternate exchange on all brokers.?
+
+Need to add callpoints & mcast messages to replicate these?
+
+** TODO [#B] Flow control for internal queues.
+
+Need to bound the size of the internal queues holding cluster events & frames.
+- stop polling when we reach bound.
+- start polling when we get back under it.
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
+
+** TODO [#C] Allow non-replicated exchanges, queues.
+
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
+Replicated queue: replicate all messages.
+
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages. Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
+
+** TODO [#C] Refactoring of common concerns.
+
+There are a bunch of things that act as "Queue observers" with intercept
+points in similar places.
+- QueuePolicy
+- QueuedEvents (async replication)
+- MessageStore
+- Cluster
+
+Look for ways to capitalize on the similarity & simplify the code.
+
+In particular QueuedEvents (async replication) strongly resembles
+cluster replication, but over TCP rather than multicast.