-*-org-*-

* A new design for Qpid clustering.

** Issues with current design.

The cluster is based on virtual synchrony: each broker multicasts
events and the events from all brokers are serialized and delivered in
the same order to each broker.

In the current design raw byte buffers from client connections are
multicast, serialized and delivered in the same order to each broker.

Each broker has a replica of all queues, exchanges, bindings and also
all connections & sessions from every broker. Cluster code treats the
broker as a "black box", it "plays" the client data into the
connection objects and assumes that by giving the same input, each
broker will reach the same state.

A new broker joining the cluster receives a snapshot of the current
cluster state, and then follows the multicast conversation.

*** Maintenance issues.

The entire state of each broker is replicated to every member:
connections, sessions, queues, messages, exchanges, management objects
etc. Any discrepancy in the state that affects how messages are
allocated to consumers can cause an inconsistency.

- Entire broker state must be faithfully updated to new members.
- Management model also has to be replicated.
- All queues are replicated, can't have unreplicated queues (e.g. for management)
  
Events that are not deterministically predictable from the client
input data stream can cause inconsistencies. In particular use of
timers/timestamps require cluster workarounds to synchronize.

A member that encounters an error which is not encounted by all other
members is considered inconsistent and will shut itself down. Such
errors can come from any area of the broker code, e.g. different
ACL files can cause inconsistent errors.

The following areas required workarounds to work in a cluster:

- Timers/timestamps in broker code: management, heartbeats, TTL
- Security: cluster must replicate *after* decryption by security layer.
- Management: not initially included in the replicated model, source of many inconsistencies.

It is very easy for someone adding a feature or fixing a bug in the
standalone broker to break the cluster by:
- adding new state that needs to be replicated in cluster updates.
- doing something in a timer or other non-connection thread.

It's very hard to test for such breaks. We need a looser coupling
and a more explicitly defined interface between cluster and standalone
broker code.

*** Performance issues.

Virtual synchrony delivers all data from all clients in a single
stream to each broker.  The cluster must play this data thru the full
broker code stack: connections, sessions etc. in a single thread
context. The cluster has a pipelined design to get some concurrency
but this is a severe limitation on scalability in multi-core hosts
compared to the standalone broker which processes each connection
in a separate thread context.

** A new cluster design.
   
Maintain /equivalent/ state not /identical/ state on each member. 

Messages from different sources need not be ordered identically on all members. 

Use a moving queue ownership protocol to agree order of dequeues, rather
than relying on identical state and lock-step behavior to cause
identical dequeues on each broker.

Clearly defined interface between broker code and cluster plug-in.

*** Requirements

The cluster must provide these delivery guarantees: 

- client sends transfer: message must be replicated and not lost even if the local broker crashes.
- client acquires a message: message must not be delivered on another broker while acquired.
- client accepts message: message is forgotten, will never be delivered or re-queued by any broker.
- client releases message: message must be re-queued on cluster and not lost.
- client rejects message: message must be dead-lettered or discarded and forgotten.
- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.


Each guarantee takes effect when the client receives a *completion*
for the associated command (transfer, acquire, reject, accept)

The cluster must provide this ordering guarantee:

- messages from the same publisher received by the same subscriber
  must be received in the order they were sent (except in the case of
  re-queued messages.)

*** Broker receiving messages
     
On recieving a message transfer, in the connection thread we:
- multicast a message-received event.
- enqueue the message on the local queue.
- asynchronously complete the transfer when the message-received is self-delivered.

This like asynchronous completion in the MessageStore: the cluster
"stores" a message by multicast. We send a completion to the client
asynchronously when the multicast "completes" by self-delivery. This
satisfies the "client sends transfer" guarantee, but makes the message
available on the queue immediately, avoiding the multicast latency.

It also moves most of the work to the client connection thread. The
only work in the virtual synchrony deliver thread is sending the client
completion.

Other brokers enqueue the message when they recieve the
message-received event, in the virtual synchrony deliver thread.

*** Broker sending messages: moving queue ownership

Each queue is *owned* by at most one cluster broker at a time. Only
that broker may acquire or dequeue messages. The owner multicasts
notification of messages it acquires/dequeues to the cluster.
Periodically the owner hands over ownership to another interested
broker, providing time-shared access to the queue among all interested
brokers.

This means we no longer need identical message ordering on all brokers
to get consistent dequeuing.  Messages from different sources can be
ordered differently on different brokers. 

We assume the same IO-driven dequeuing algorithm as the standalone
broker with one modification: queues can be "locked". A locked queue
is not available for dequeuing messages and will be skipped by the
output algorithm.

At any given time only those queues owned by the local broker will be
unlocked.

As messages are acquired/dequeued from unlocked queues by the IO threads
the broker multicasts acquire/dequeue events to the cluster.

When an unlocked queue has no more consumers with credit, or when a
time limit expires, the broker relinquishes ownership by multicasting
a release-queue event, allowing another interested broker to take
ownership.

*** Asynchronous completion of accept

In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.

On receiving an accept the broker:
- dequeues the message from the local queue
- multicasts a "dequeue" event
- completes the accept asynchronously when the dequeue event is self delivered. 

NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.

** Inconsistent errors.

The new design eliminates most sources of inconsistent errors
(connections, sessions, security, management etc.) The only points
where inconsistent errors can occur are at enqueue and dequeue (most
likely store-related errors.)

The new design can use the exisiting error-handling protocol with one
major improvement: since brokers are no longer required to maintain
identical state they do not have to stall processing while an error is
being resolved.

#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.

When a new member (the updatee) joins a cluster it needs to be brought up to date.
The old cluster design an existing member (the updater) sends a state snapshot.

To ensure consistency of the snapshot both the updatee and the updater
"stall" at the point of the update. They stop processing multicast
events and queue them up for processing when the update is
complete. This creates a back-log of work for each to get through,
which leaves them lagging behind the rest of the cluster till they
catch up (which is not guaranteed to happen in a bounded time.)

** Updating new members

When a new member (the updatee) joins a cluster it needs to be brought
up to date with the rest of the cluster.  An existing member (the
updater) sends an "update".

In the old cluster design the update is a snapshot of the entire
broker state.  To ensure consistency of the snapshot both the updatee
and the updater "stall" at the start of the update, i.e. they stop
processing multicast events and queue them up for processing when the
update is complete. This creates a back-log of work to get through,
which leaves them lagging behind the rest of the cluster till they
catch up (which is not guaranteed to happen in a bounded time.)

With the new cluster design only exchanges, queues, bindings and
messages need to be replicated.

Update of wiring (exchanges, queues, bindings) is the same as current
design.

Update of messages is different:
- per-queue rather than per-broker, separate queues can be updated in parallel.
- updates queues in reverse order to eliminate unbounded catch-up
- does not require updater & updatee to stall during update.

Replication events, multicast to cluster:
- enqueue(q,m): message m pushed on back of queue q .
- acquire(q,m): mark m acquired
- dequeue(q,m): forget m.
Messages sent on update connection:
- update_front(q,m): during update, receiver pushes m to *front* of q
- update_done(q): during update, update of q is complete.
  
Updater: 
- when updatee joins set iterator i = q.end()
- while i != q.begin(): --i; send update_front(q,*i) to updatee
- send update_done(q) to updatee

Updatee:
- q initially in locked state, can't dequeue locally.
- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
- receive update_front(q,m): q.push_front(m)
- receive update_done(q): q can be unlocked for local dequeing.

Benefits:
- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
- During update consumers actually help by removing messages before they need to be updated.
- Needs no separate "work to do" queue, only the broker queues themselves.

# TODO how can we recover from updater crashing before update complete?
# Clear queues that are not updated & send request for udpates on those queues?

# TODO updatee may receive a dequeue for a message it has not yet seen, needs
# to hold on to that so it can drop the message when it is seen.
# Similar problem exists for wiring?
  
** Cluster API

The new cluster API is similar to the MessageStore interface.
(Initially I thought it would be an extension of the MessageStore interface,
but as the design develops it seems better to make it a separate interface.)

The cluster interface captures these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/acquired/released/rejected/dequeued.

The cluster will require some extensions to the Queue:
- Queues can be "locked", locked queues are ignored by IO-driven output.
- Messages carry a cluster-message-id.
- messages can be dequeued by cluster-message-id

** Maintainability

This design gives us more robust code with a clear and explicit interfaces.

The cluster depends on specific events clearly defined by an explicit
interface. Provided the semantics of this interface are not violated,
the cluster will not be broken by changes to broker code.

The cluster no longer requires identical processing of the entire
broker stack on each broker. It is not affected by the details of how
the broker allocates messages. It is independent of the
protocol-specific state of connections and sessions and so is
protected from future protocol changes (e.g. AMQP 1.0)

A number of specific ways the code will be simplified:
- drop code to replicate management model. 
- drop timer workarounds for TTL, management, heartbeats.
- drop "cluster-safe assertions" in broker code.
- drop connections, sessions, management from cluster update.
- drop security workarounds: cluster code now operates after message decoding.
- drop connection tracking in cluster code.
- simper inconsistent-error handling code, no need to stall.

** Performance

The only way to verify the relative performance of the new design is
to prototype & profile. The following points suggest the new design
may scale/perform better:

Moving work from virtual synchrony thread to connection threads where
it can be done in parallel:
- All connection/session logic moves to connection thread.
- Exchange routing logic moves to connection thread.
- Local broker does all enqueue/dequeue work in connection thread
- Enqueue/dequeue are IO driven as for a standalone broker.

Optimizes common cases (pay for what you use):
- Publisher/subscriber on same broker: replication is fully asynchronous, no extra latency.
- Unshared queue: dequeue is all IO-driven in connection thread.
- Time sharing: pay for time-sharing only if queue has consumers on multiple brokers. 

#TODO Not clear how the time sharing algorithm would compare with the existing cluster delivery algorithm.

Extra decode/encode: The old design multicasts raw client data and
decodes it in the virtual synchrony thread. The new design would
decode messages in the connection thread, re-encode them for
multicast, and decode (on non-local brokers) in the virtual synchrony
thread. There is extra work here, but only in the *connection* thread:
on a multi-core machine this happens in parallel for every connection,
so it probably is not a bottleneck. There may be scope to optimize
decode/re-encode by re-using some of the original encoded data, this
could also benefit the stand-alone broker.

** Asynchronous queue replication

The existing "asynchronous queue replication" feature maintains a
passive backup passive backup of queues on a remote broker over a TCP
connection.

The new cluster replication protocol could be re-used to implement
asynchronous queue replication: its just a special case where the
active broker is always the queue owner and the enqueue/dequeue
messages are sent over a TCP connection rather than multicast.

The new update update mechanism could also work with 'asynchronous
queue replication', allowing such replication (over a TCP connection
on a WAN say) to be initiated after the queue had already been created
and been in use (one of the key missing features).

** Misc outstanding issues & notes

Message IDs: need an efficient cluster-wide message ID.

Replicating wiring
- Need async completion of wiring commands? 
- qpid.sequence_counter: need extra work to support in new design, do we care?

Cluster+persistence:
- finish async completion: dequeue completion for store & cluster
- cluster restart from store: clean stores *not* identical, pick 1, all others update.
- need to generate cluster ids for messages recovered from store.
  
Live updates: we don't need to stall brokers during an update!
- update on queue-by-queue basis.
- updatee locks queues during update, no dequeue.
- update in reverse: don't update messages dequeued during update.
- updatee adds update messages at front (as normal), replicated messages at back.
- updater starts from back, sends "update done" when it hits front of queue.

Flow control: need to throttle multicasting
1. bound the number of outstanding multicasts.
2. ensure the entire cluster keeps up, no unbounded "lag"
The existing design uses read-credit to solve 1., and does not solve 2.
New design should stop reading on all connections while flow control
condition exists? 

Can federation also be unified, at least in configuration?

Consider queues (and exchanges?) as having "reliability" attributes:
- persistent: is the message stored on disk.
- backed-up (to another broker): active/passive async replication.
- replicated (to a cluster): active/active multicast replication to cluster.
- federated: federation link to a queue/exchange on another broker.

"Reliability" seems right for the first 3 but not for federation, is
there a better term?

Clustering and scalability: new design may give us the flexibility to
address scalability as part of cluster design. Think about
relationship to federation and "fragmented queues" idea.

