-*-org-*-
Notes on new cluster implementation. See also: new-cluster-design.txt

* Implementation plan.
  
Co-existence with old cluster code and tests:
- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
- Double up tests with old version/new version as the new code develops.

Minimal POC for message delivery & perf test.
- no wiring replication, no updates, no failover, no persistence, no async completion.
- just implement publish and acquire/dequeue locking protocol.
- measure performance.

Full implementation of transient cluster
- Update (based on existing update), async completion etc.
- Passing all existing transient cluster tests.

Persistent cluster
- Make sure async completion works correctly.
- InitialStatus protoocl etc. to support persistent start-up (existing code)
- cluster restart from store: stores not identical. Load one, update the rest.
 - assign cluster ID's to messages recovered from store, don't replicate.

Improved update protocol
- per-queue, less stalling, bounded catch-up.

* Task list

** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.

NOTE: as implementation questions arise, take the easiest option and make
a note for later optimization/improvement.

*** Tests
- python test: 4 senders, numbered messages, 4 receivers, verify message set.
- acquire then release messages: verify can be dequeued on any member
- acquire then kill broker: verify can be dequeued other members.
- acquire then reject: verify goes on alt-exchange once only.

*** TODO broker::Cluster interface and call points.

Initial draft is commited. 

Issues to review:

queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
when messages are pushed. How to reconcile with queue ownership?

rejecting messages: if there's an alternate exchange where do we do the
re-routing? On origin broker or on all brokers?

Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
Intercepting client actions on the queue vs. internal actions
(e.g. ring policy)

*** Main classes

BrokerHandler:
- implements broker::Cluster intercept points.
- sends mcast events to inform cluster of local actions.
- thread safe, called in connection threads.

LocalMessageMap:
- Holds local messages while they are being enqueued.
- thread safe: called by both BrokerHandler and DeliverHandler

MessageHandler:
- handles delivered mcast messages related to messages.
- initiates local actions in response to mcast events.
- thread unsafe, only called in deliver thread.
- maintains view of cluster state regarding messages.

QueueOwnerHandler:
- handles delivered mcast messages related to queue consumer ownership.
- thread safe, called in deliver, connection and timer threads.
- maintains view of cluster state regarding queue ownership.

cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
- thread safe: manage state used by both DeliverHandler and BrokerHandler

The following code sketch illustrates only the "happy path" error handling
is omitted.

*** BrokerHandler
Types:
- struct QueuedMessage { Message msg; QueueName q; Position pos; }
- SequenceNumber 64 bit sequence number to identify messages.
- NodeId 64 bit CPG node-id, identifies member of the cluster.
- struct MessageId { NodeId node; SequenceNumber seq; }

Members:
- atomic<SequenceNumber> sequence // sequence number for message IDs.
- thread_local bool noReplicate // suppress replication.
- thread_local bool isRouting // suppress operations while routing
- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.

NOTE: localMessage is also modified by DeliverHandler.

broker::Cluster intercept functions:

routing(msg)
  if noReplicate: return
  # Supress everything except enqueues while we are routing.
  # We don't want to replicate acquires & dequeues caused by an enqueu,
  # e.g. removal of messages from ring/LV queues.
  isRouting = true 

enqueue(qmsg):
  if noReplicate: return
  if !qmsg.msg.id:
    seq = sequence++
    qmsg.msg.id = (self,seq)
    localMessage[seq] = qmsg
    mcast create(encode(qmsg.msg),seq)
  mcast enqueue(qmsg.q,qmsg.msg.id.seq)

routed(msg):
  if noReplicate: return
  if msg.id: mcast routed(msg.id.seq)
  isRouting = false

acquire(qmsg):
  if noReplicate: return
  if isRouting: return # Ignore while we are routing a message.
  if msg.id: mcast acquire(msg.id, q)

release(QueuedMessage) 
  if noReplicate: return
  if isRouting: return # Ignore while we are routing a message.
  if msg.id: mcast release(id, q)

accept(QueuedMessage):
  if noReplicate: return
  if isRouting: return # Ignore while we are routing a message.
  if msg.id: mcast dequeue(msg.id, msg.q)

reject(QueuedMessage):
  isRejecting = true
  if msg.id: mcast reject(msg.id, msg.q)

rejected(QueuedMessage):
  isRejecting = false
  mcast dequeue(msg.id, msg.q)

dequeue(QueuedMessage) 
  # No mcast in dequeue, only used for local cleanup of resources. 
  # E.g. messages that are replaced on an LVQ are dequeued without being
  # accepted or rejected. dequeue is called with the queue lock held
  # FIXME revisit - move it out of the queue lock.
  cleanup(msg)

*** DeliverHandler and mcast messages
Types:
- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
- struct QueueKey { MessageId id; QueueName q; }
- typedef map<QueueKey, QueueEntry> Queue
- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }

Members:
- QueueEntry enqueued[QueueKey]
- Node node[NodeId]

Mcast messages in Message class:

create(msg,seq)
  if sender != self: node[sender].routing[seq] = decode(msg)

enqueue(q,seq):
  id = (sender,seq)
  if sender == self:
    enqueued[id,q] = (localMessage[seq], acquired=None)
  else:
    msg = sender.routing[seq]
    enqueued[id,q] = (qmsg, acquired=None)
    with noReplicate=true: qmsg = broker.getQueue(q).push(msg)

routed(seq):
  if sender == self: localMessage.erase(msg.id.seq)
  else: sender.routing.erase(seq)

acquire(id,q):
  enqueued[id,q].acquired = sender
  node[sender].acquired.push_back((id,q))
  if sender != self:
    with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])

release(id,q)
  enqueued[id,q].acquired = None
  node[sender].acquired.erase((id,q))
  if sender != self
    with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])

reject(id,q):
  sender.routing[id] = enqueued[id,q] # prepare for re-queueing

rejected(id,q)
  sender.routing.erase[id]

dequeue(id,q)
  entry = enqueued[id,q]
  enqueued.erase[id,q]
  node[entry.acquired].acquired.erase(id,q)
  if sender != self:
    with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)

member m leaves cluster:
  for key in node[m].acquired:
   release(key.id, key.q)
  node.erase(m)

*** Queue consumer locking

When a queue is locked it does not deliver messages to its consumers.

New broker::Queue functions:
- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
- startConsumers(): reset consumersStopped flag

Implementation sketch, locking omitted:

void Queue::stopConsumers() {
  consumersStopped = true;
  while (consumersBusy) consumersBusyMonitor.wait();
}

void Queue::startConsumers() {
  consumersStopped = false;
  listeners.notify();
}

bool Queue::dispatch(consumer) {
   if (consumersStopped) return false;
   ++consumersBusy;
   do_regular_dispatch_body()
   if (--consumersBusy == 0) consumersBusyMonitor.notify();
}

*** QueueOwnerHandler

Invariants:
- Each queue is owned by at most one node at any time.
- Each node is interested in a set of queues at any given time.
- A queue is un-owned if no node is interested.

The queue owner releases the queue when
- it loses interest i.e. queue has no consumers with credit.
- a configured time delay expires and there are other interested nodes.

The owner mcasts release(q). On delivery the new queue owner is the
next node in node-id order (treating nodes as a circular list)
starting from the old owner that is interested in the queue.

Queue consumers initially are stopped, only started when we get
ownership from the cluster.

Thread safety: called by deliver, connection and timer threads, needs locking.

Thread safe object per queue holding queue ownership status.
Called by deliver, connection and timer threads.

class QueueOwnership {
  bool owned;
  Timer timer;
  BrokerQueue q;

  drop(): # locked
    if owned:
      owned = false
      q.stopConsumers()
      mcast release(q.name, false)
      timer.stop()

  take(): # locked
    if not owned:
      owned = true
      q.startConsumers()
      timer.start(timeout)

  timer.fire(): drop()
}

Data Members, only modified/examined in deliver thread:
- typedef set<NodeId> ConsumerSet
- map<QueueName, ConsumerSet> consumers
- map<QueueName, NodeId> owner

Thread safe data members, accessed in connection threads (via BrokerHandler):
- map<QueueName, QueueOwnership> ownership

Multicast messages in QueueOwner class:

consume(q):
  if sender==self and consumers[q].empty(): ownership[q].take()
  consumers[q].insert(sender)

release(q):
  asssert(owner[q] == sender and owner[q] in consumers[q])
  owner[q] = circular search from sender in consumers[q]
  if owner==self: ownership[q].take()

cancel(q):
  assert(queue[q].owner != sender) # sender must release() before cancel()
  consumers[q].erase(sender)

member-leaves:
  for q in queue: if owner[q] = left: left.release(q)

Need 2 more intercept points in broker::Cluster:

consume(q,consumer,consumerCount) - Queue::consume()
  if consumerCount == 1: mcast consume(q)

cancel(q,consumer,consumerCount) - Queue::cancel()
  if consumerCount == 0:
    ownership[q].drop()
  mcast cancel(q)

#TODO: lifecycle, updating cluster data structures when queues are destroyed

*** Re-use of existing cluster code
- re-use Event
- re-use Multicaster
- re-use same PollableQueueSetup (may experiment later)
- new Core class to replace Cluster.
- keep design modular, keep threading rules clear.

** TODO [#B] Large message replication.
Need to be able to multicast large messages in fragments

** TODO [#B] Batch CPG multicast messages
The new cluster design involves a lot of small multicast messages,
they need to be batched into larger CPG messages for efficiency.
** TODO [#B] Genuine async completion
Replace current synchronous waiting implementation with genuine async completion.

Test: enhance test_store.cpp to defer enqueueComplete till special message received.

Async callback uses *requestIOProcessing* to queue action on IO thread.

** TODO [#B] Async completion of accept when dequeue completes.
Interface is already there on broker::Message, just need to ensure
that store and cluster implementations call it appropriately.

** TODO [#B] Replicate wiring.
From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.

** TODO [#B] New members joining - first pass

Re-use update code from old cluster but don't replicate sessions &
connections.

Need to extend it to send cluster IDs with messages.

Need to replicate the queue ownership data as part of the update.

** TODO [#B] Persistence support.
InitialStatus protoocl etc. to support persistent start-up (existing code)

Only one broker recovers from store, update to others.

Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.

** TODO [#B] Handle other ways that messages can leave a queue.

Other ways (other than via a consumer) that messages are take off a queue.

NOTE: Not controlled by queue lock, how to make them consistent?

Target broker may not have all messages on other brokers for purge/destroy.
- Queue::move() - need to wait for lock? Replicate?
- Queue::get() - ???
- Queue::purge() - replicate purge? or just delete what's on broker ?
- Queue::destroy() - messages to alternate exchange on all brokers.?

Need to add callpoints & mcast messages to replicate these?

** TODO [#B] Flow control for internal queues.
   
Need to bound the size of the internal queues holding cluster events & frames.
- stop polling when we reach bound.
- start polling when we get back under it.
** TODO [#B] Integration with transactions.
Do we want to replicate during transaction & replicate commit/rollback
or replicate only on commit?
No integration with DTX transactions.
** TODO [#B] Make new cluster work with replication exchange.
Possibly re-use some common logic. Replication exchange is like clustering
except over TCP.
** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
Cluster needs to complete these asynchronously to guarantee resources
exist across the cluster when the command completes.

** TODO [#C] Allow non-replicated exchanges, queues.
   
Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
- save replicated status to store.
- support in management tools.
Replicated exchange: replicate binds to replicated queues.
Replicated queue: replicate all messages.

** TODO [#C] New members joining - improved.

Replicate wiring like old cluster, stall for wiring but not for
messages.  Update messages on a per-queue basis from back to front.

Updater:
- stall & push wiring: declare exchanges, queues, bindings.
- start update iterator thread on each queue.
- unstall and process normally while iterator threads run.

Update iterator thread:
- starts at back of updater queue, message m.
- send update_front(q,m) to updatee and advance towards front
- at front: send update_done(q)

Updatee:
- stall, receive wiring, lock all queues, mark queues "updating", unstall
- update_front(q,m): push m to *front* of q
- update_done(q): mark queue "ready"

Updatee cannot take the queue consume lock for a queue that is  updating.
Updatee *can* push messages onto a queue that is updating.

TODO: Is there any way to eliminate the stall for wiring?

** TODO [#C] Refactoring of common concerns.

There are a bunch of things that act as "Queue observers" with intercept
points in similar places.
- QueuePolicy
- QueuedEvents (async replication)
- MessageStore
- Cluster

Look for ways to capitalize on the similarity & simplify the code.

In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.
