summaryrefslogtreecommitdiff
path: root/src/documentation/content/xdocs/Cluster Design Note.html
blob: 11093d52e8a082e6930419e64845f2c935f91d3d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
<html>
    <head>
        <title>Apache Qpid : Cluster Design Note</title>
	    <link rel="stylesheet" href="styles/site.css" type="text/css" />
        <META http-equiv="Content-Type" content="text/html; charset=UTF-8">	    
    </head>

    <body>
	    <table class="pagecontent" border="0" cellpadding="0" cellspacing="0" width="100%" bgcolor="#ffffff">
		    <tr>
			    <td valign="top" class="pagebody">
				    <div class="pageheader">
					    <span class="pagetitle">
                            Apache Qpid : Cluster Design Note
                                                    </span>
				    </div>
				    <div class="pagesubheading">
					    This page last changed on Apr 20, 2007 by <font color="#0050B2">aconway</font>.
				    </div>

				    <hr />

<div>
<ul>
  <li><a href='#ClusterDesignNote-Overview'>Overview</a>
<ul>
  <li><a href='#ClusterDesignNote-Clientsiderequirements'>Client side requirements</a></li>
  <li><a href='#ClusterDesignNote-Clusterprotocols'>Cluster protocols</a></li>
  <li><a href='#ClusterDesignNote-Sessions'>Sessions</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-TypesofstateWehavetoconsiderseveralkindsofstate%3A'>Types of state We have to consider several kinds of state:</a>
<ul>
  <li>
<ul>
  <li><a href='#ClusterDesignNote-TheClusterMap%3AMembershipandWiring'>The Cluster Map: Membership and  Wiring</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-ProxiesandQueueContent'>Proxies and Queue Content</a>
<ul>
  <li><a href='#ClusterDesignNote-FragmentedSharedQueues'>Fragmented Shared Queues</a></li>
</ul></li>
</ul></li>
  <li><a href='#ClusterDesignNote-SessionState'>Session State</a>
<ul>
  <li><a href='#ClusterDesignNote-Inflightcommands'>In-flight commands</a></li>
  <li><a href='#ClusterDesignNote-Resumingachannel'>Resuming a channel</a></li>
  <li><a href='#ClusterDesignNote-Replicatingsessionstate.'>Replicating session state.</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-MappingofAMQPcommandstoreplicationmechanisms'>Mapping of AMQP commands  to replication mechanisms</a>
<ul>
  <li><a href='#ClusterDesignNote-queue.declare%2Fbind%2Fdelete%2Cexchange.declare%2Fdelete'>queue.declare/bind/delete, exchange.declare/delete</a></li>
  <li><a href='#ClusterDesignNote-message.transfer%2Fbasic.publish%28clienttobroker%29'>message.transfer/basic.publish (client to broker)</a></li>
  <li><a href='#ClusterDesignNote-message.transfer%28brokertoclient%29%2Cmessage.deliver'>message.transfer(broker to client), message.deliver</a></li>
  <li><a href='#ClusterDesignNote-message.consume%2Fbasic.consume'>message.consume/basic.consume</a></li>
  <li><a href='#ClusterDesignNote-basic.ack%2Fmessage.ok%28fromclient%29'>basic.ack/message.ok(from client)</a></li>
  <li><a href='#ClusterDesignNote-basic.ack%2Fmessage.ok%28frombroker%29'>basic.ack/message.ok(from broker)</a></li>
  <li><a href='#ClusterDesignNote-basic.reject%2Fmessage.reject'>basic.reject / message.reject</a></li>
  <li><a href='#ClusterDesignNote-reference.open%2Fapppend%2Fclose%28clienttobroker%29'>reference.open/apppend/close (client to broker)</a></li>
  <li><a href='#ClusterDesignNote-reference.open%2Fapppend%2Fclose%28brokertoclient%29'>reference.open/apppend/close (broker to client) **</a></li>
  <li><a href='#ClusterDesignNote-Allcommands'>All commands</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-ClientBrokerProtocol'>Client-Broker Protocol</a></li>
  <li><a href='#ClusterDesignNote-BrokerBrokerProtocol'>Broker-Broker Protocol</a></li>
  <li><a href='#ClusterDesignNote-PersistenceandRecovery'>Persistence and Recovery</a>
<ul>
  <li><a href='#ClusterDesignNote-Competingfailuremodes%3A'>Competing failure modes:</a></li>
  <li><a href='#ClusterDesignNote-Persistenceoverview'>Persistence overview</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-Journals'>Journals</a>
<ul>
  <li><a href='#ClusterDesignNote-Overview'>Overview</a></li>
  <li><a href='#ClusterDesignNote-Useofjournals'>Use of journals</a></li>
  <li><a href='#ClusterDesignNote-Whataboutdisklessreliability%3F'>What about diskless reliability?</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-Virtualsynchrony'>Virtual synchrony</a></li>
  <li><a href='#ClusterDesignNote-Configuration'>Configuration</a>
<ul>
  <li><a href='#ClusterDesignNote-SimplifyingpatternsPossiblewaystoconfigureacluster%3A'>Simplifying patterns Possible ways to configure a cluster:</a></li>
  <li><a href='#ClusterDesignNote-Dynamicclusterconfiguration'>Dynamic cluster configuration</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-Transactions'>Transactions</a>
<ul>
  <li><a href='#ClusterDesignNote-Localtransactions'>Local transactions</a></li>
  <li><a href='#ClusterDesignNote-DistributedTransactions'>Distributed Transactions</a></li>
</ul></li>
  <li><a href='#ClusterDesignNote-OpenQuestions'>Open Questions</a></li>
</ul></div>

<hr />


<h1><a name="ClusterDesignNote-Overview"></a>Overview</h1>

<p>A Qpid <em>cluster</em> is a "virtual AMQP broker" distributed over multiple processes on multiple hosts. The cluster can continues to provide service in the event of failures of its members.</p>

<p>Reliability guarantees depend on configuration, there may be configurable trade-offs between reliability and performance or hardware requirements.</p>

<p>Clustering is <em>transparent</em> to clients. Cluster clients are standard AMQP clients and use the cluster via the AMQP protocol like a standard broker. If a client is disconnected unexpectedly it can fail-over transparently to another member.</p>

<p>We want to address two major scenarios:</p>
<ul>
	<li>Cluster backed by high-performance SAN storage.</li>
	<li>Cluster with only unshared local storage on each node.</li>
</ul>


<h2><a name="ClusterDesignNote-Clientsiderequirements"></a>Client side requirements</h2>

<p><em>Transparent failover</em>: the <em>failover manager</em> component of the Qpid client libraries provides a <em>virtual AMQP connection</em>. Client applications send and receive AMQP commands as if conducting an uninterrupted conversation with a single broker.</p>

<p>In reality the virtual connection may be a series of real network connections to different cluster members. The failover manager negotiates re-connection and state synchronization, but failover commands <em>do not appear</em> on the virtual connection.</p>

<p>Imagine an AMQP conversation as a paper tape. The real conversation may be multiple strips of torn tape. Near the tears are failover commands that the client never sees. The failover manager cuts off the ragged edges and glues pieces together into a single tape that looks like an uninterrupted conversation with a single broker.</p>

<p>The failover manager must present a virtual conversation consistent with the failed conversations that <em>could have happened</em> with a single broker. It does not have to exactly duplicate the timing or ordering of the real conversations.</p>


<h2><a name="ClusterDesignNote-Clusterprotocols"></a>Cluster protocols</h2>

<p>We will use two types of protocol to synchronize the cluster:</p>
<ul>
	<li>Virtual synchrony: Open AIS, CPG etc.</li>
	<li>Point-to-point: AMQP (with proprietary extensions) over TCP.</li>
</ul>


<p>We will use two persistence approaches:</p>
<ul>
	<li>GFS shared storage.</li>
	<li>Local unshared disk storage on a node.</li>
</ul>


<p>The first cluster implementation will be entirely virtual synchrony and GFS: the SAN scenario.</p>

<p>Next iteration will support the local store only scenario but still replicate via EVS.</p>

<p>Subsequent optimization iterations will use primary-backup replication and proxies to increase througput by reducing multicast packets appearing on all switch ports. Cluster membership and exchange wiring will remain on EVS.</p>

<p>At all stages of development there will be concrete performance tests and measurements to ensure we are really optimizing.</p>

<p>Initially we will use AMQP 0-9 WIP protocol using field tables and message headers to pass additional cluster info. At some point we will migrate to 0-10 which is expected to have all the features needed for clustering.</p>


<h2><a name="ClusterDesignNote-Sessions"></a>Sessions</h2>

<p>A <em>session</em> identifies a client-broker relationship that can outlive a single connection. It will be formalized in AMQPO 0-10.</p>

<p><em>Orderly closure</em> of a connection by either peer ends the session.  On Unexpected disconnect the session remains viable for some timeout period allowing the client can reconnect to the cluster and resume.</p>

<p>Events in the AMQP.0-8 spec that are triggered by closing a connection (e.g. deleting auto-delete queues) are instead trigged by the closure (or timeout) of a session.</p>



<h1><a name="ClusterDesignNote-TypesofstateWehavetoconsiderseveralkindsofstate%3A"></a>Types of state We have to consider several kinds of state:</h1>

<ul>
	<li><em>Cluster Membership</em>: Active cluster members (nodes) and data about them.</li>
	<li><em>AMQP Wiring</em>: Names and properties of queues, exchanges and bindings.</li>
	<li><em>AMQP Content</em>: Data in messages on queues.</li>
	<li><em>Session</em>: Conversation with a single client, including references.</li>
</ul>


<p>Data must be replicated and stored such that:</p>
<ul>
	<li>A client knows which node(s) can be used for failover.</li>
	<li>After a failover, the client can continue its session uninterruped.</li>
	<li>No acknowledged messages or commands are lost.</li>
	<li>No messges or commands are applied twice.</li>
</ul>



<p>Cluster membership, wiring and session identities are low volume, and will be replicated using virtual synchrony so the entire cluster has a consistent picture.</p>

<p>Queue content is high volume so it is replicated point-to-point using primary-backup to avoid flooding the network.</p>

<p>Session state is potentially high volume and only relevant to a single client, so it is also replicated point-to-point.</p>

<p>How to choose the number and location of backup nodes for a given queue or session is an open question. Note that the choice is independent for every queue and session in principle, but in practice they will probably be grouped in some way.</p>

<h3><a name="ClusterDesignNote-TheClusterMap%3AMembershipandWiring"></a>The Cluster Map: Membership and  Wiring</h3>

<p>Membership, wiring and session changes are low volume. They are replicated to entire cluster symmetrically using EVS.</p>

<p>Wiring inclues</p>
<ul>
	<li>exchange names and properties</li>
	<li>queue names, properties and bindings.</li>
</ul>


<p>Membership data includes:</p>
<ul>
	<li>address of each node</li>
	<li>state of health of each node</li>
	<li>primary/backup for each queue/exchange</li>
	<li>session names, primary/backup for each session.</li>
</ul>



<h2><a name="ClusterDesignNote-ProxiesandQueueContent"></a>Proxies and Queue Content</h2>

<p>For primary-backup replication each queue has a primary and backup node.  Other nodes act as proxies to the primary. The client is unaware of the distinction, it sees an identical picture regardless of what node it connects to.</p>

<p>Note a single cluster member may have a mix of primary, backup and proxy queues.</p>

<p><b>TODO</b>: Ordering issues with proxys and put-back messages (reject, transaction rollback) or selectors.</p>

<h3><a name="ClusterDesignNote-FragmentedSharedQueues"></a>Fragmented Shared Queues</h3>

<p>A shared queue has reduced ordering requirements and increased distribution requirements. <em>Fragmenting</em> a shared queue is a special type of replication. The queue is broken into a set of disjoint sub-queues each on a separate node to distribute load.</p>

<p>Each fragment (sub-queue) content is replicated to backups just like a normal queue, independently of the other fragments.</p>

<p>The fragments collaberate to create the appearance of a single queue. Fragments store incomging messges in the local queue, and serve local consumers from the local queue whenever possible. When a fragment does not have messages to satisfy its consumers it consumes messages from other fragments in the group. Proxies to a fragmented queue will consume from the "nearest" fragment if possible.</p>

<p><b>TODO</b>: Proxies can play a more active role. Ordering guarantees, we can provide "same producer to same consumer preserves order" since messages from the same producer always go on the same fragment queue. May break down in the presence of failover unless we remember which fragment received messges from the client and proxy to the same one on the failover replica.</p>




<h1><a name="ClusterDesignNote-SessionState"></a>Session State</h1>

<p>Session state includes:</p>
<ul>
	<li>open channels, channel attributes (qos, transactions etc.).</li>
	<li>active consumers.</li>
	<li>open references.</li>
	<li>completed command history.</li>
	<li>commands in flight.</li>
	<li>open transactions</li>
	<li>exclusive/private queues.</li>
</ul>


<p>The broker a client is connected to is the session primary, one or more other brokers are session backup. On failure of the primary the client fails-over to a backup as described below.</p>

<p>The client can also fail over to a non-backup node which retrieves session state from the backup.</p>

<p>The primary-backup protocol must guarantee that the backup has sufficient data to resume at all times without becoming a synchronous bottleneck.</p>

<h2><a name="ClusterDesignNote-Inflightcommands"></a>In-flight commands</h2>

<p>Both peers must store sent commands for possible resend and received commands to detect possible duplicates in a failover.</p>

<p>To keep session size finite a peer can:</p>
<ul>
	<li>forget sent commands when we know the other peer has received them.</li>
	<li>forget received commands when we know the other peer will not resend them.</li>
</ul>


<p>An algorithm to achieve this:</p>

<p>self_received(r): <blockquote><p>if r.is_response: peer_received(sentr.responds\_to\_id) for s in sent0..r.process\_mark: peer_received(s)</p></blockquote></p>

<p>peer_received(s): <blockquote><p>sent.erase(s)			# forget s but also... # Peer will never resend commands &lt;= s.process_mark. for r in received0..s.process\_mark received.erase(r)</p></blockquote></p>

<p>The weakest rules for interop between peers A and B are:</p>

<ul>
	<li>A MAY forget a sent command when A knows B received it.</li>
	<li>A MUST NOT re-send a command after <em>B could know that</em> A knows B received it.</li>
	<li>A MUST remember received commands till A knows that B knows A received it.</li>
</ul>


<p>Or in protocol terms:</p>

<ul>
	<li>A MAY forget sent command N when it receives a response to N.</li>
	<li>A MUST NOT resend N after sending a response to a response to N.</li>
	<li>A MUST remember received command N until it has both sent M responding to N <em>and</em> received a response to M.</li>
</ul>



<h2><a name="ClusterDesignNote-Resumingachannel"></a>Resuming a channel</h2>

<p>When a channel is first opened, the broker provides a session-id. If there is a failure, the client can connect to the session backup broker and resume the channel as follows (sync code is just for illustration)</p>

<p><em>TODO does it matter if the new channel number is different from the old?</em></p>

<ol>
	<li>Client client_resume: <blockquote><p>send(command=channel_resume, command_id=0, session_id=resume_id, process_mark=pre_crash_process_mark) ok = receive(command=channel_ok) self_received(ok) # Clean up to peers process mark. resend() continue_session_as_normal()</p></blockquote></li>
</ol>


<ol>
	<li>Both sides resend(): <blockquote><ol>
	<li>Resend in-flight messages. for s in sent: # Careful not to respond to a command we haven't received yet. if s.is_response: until(received.contains(s.resonds_to_id)): self_received(receive()) send(s);   # Original command ids and process_mark</li>
</ol>
</blockquote></li>
</ol>


<ol>
	<li>Broker broker_received_channel_resume(r): <blockquote><p>session=sessionsr.session\_id self_received(r) # Up to date with peers process mark. send(command=channel_ok, command_id=0, process_mark=session.process_mark) resend() continue_session_as_normal()</p></blockquote></li>
</ol>



<h2><a name="ClusterDesignNote-Replicatingsessionstate."></a>Replicating session state.</h2>

<p><em>TODO: Need to minimize primary synchronously waiting on backup, while ensuring that the primary always knows that the backup is in a state that satisfies the clients expectations for failover. See recent email thread betwween me &amp; gordon</em></p>




<h1><a name="ClusterDesignNote-MappingofAMQPcommandstoreplicationmechanisms"></a>Mapping of AMQP commands  to replication mechanisms</h1>

<h2><a name="ClusterDesignNote-queue.declare%2Fbind%2Fdelete%2Cexchange.declare%2Fdelete"></a>queue.declare/bind/delete, exchange.declare/delete</h2>

<p>Update cluster map.  Local broker creates the initial queue as primary and establishes a backup.</p>

<p><em>Private queue</em>: backed up on the <em>session backup</em>.</p>

<p><em>Shared queue</em>: local primary queue is the first <em>primary fragment</em>. Other brokers that receive publishes for the queue can proxy to this fragment or create their own local fragment (<em>TODO: How do we decide?</em>) Consumes are always served from the local fragment if possible, otherwise proxied to another fragment <em>(TODO: load balancing algorithms to choose the appropriate fragment)</em></p>


<h2><a name="ClusterDesignNote-message.transfer%2Fbasic.publish%28clienttobroker%29"></a>message.transfer/basic.publish (client to broker)</h2>

<p>Local broker evaluates the binding to determine which queue(s) receive the message.</p>
<ul>
	<li>primary queues: update local queue, replicate to backup.</li>
	<li>proxy queues: forward to primary<br/>
(When the proxy is also a backup we can optimize out the replication step.)</li>
</ul>


<p>If the message is delivered to more than one proxy queue on the same node, we just relay the message once. Brokers must be able to differentiate between normal message transfer and proxy/replication transfer so that when the evaluate the binding they only apply the message to local primary/backup queues respectively, and don't attempt to re-forward messages.</p>

<p><em>TODO: there are a few options</em>:</p>
<ul>
	<li>Use custom backup/proxy exchanges and pass an explicit list of queues to receive the message in the header table.</li>
	<li>Use normal AQMP commands over a marked connectin/channel</li>
	<li>Introduce new cluster commands.</li>
</ul>



<h2><a name="ClusterDesignNote-message.transfer%28brokertoclient%29%2Cmessage.deliver"></a>message.transfer(broker to client), message.deliver</h2>

<ul>
	<li>primary: replicate deliver to backup(s) deliver to client.</li>
	<li>proxy: pass through to client.</li>
</ul>


<p>Before sending a message to a client, the primary must be sure that the session backup 'knows' about the delivery; i.e. in the event of primary failure the backup knows about unacked messages and will be able to handle an ack or reject for it, resend or requeue it.</p>

<p>If we can define a clear and deterministic algorithm for message dispatch, and if we replicate all 'inputs' in order then that should be sufficient.</p>

<p>Selectors slightly complicate the picture, as do multiple consumers and flow control particularly for shared queues where the consumers could be from different sessions.</p>

<p>In the case of an exclusive or private queue all the inputs come from a single session. If all session requests are handled serially on both primary and backup then dispatch should be deterministic; if separate threads were used to process separate queues that would be lost as the allocation of delivery tags would be dependent on the interleaving of those threads.</p>

<p>One way of avoiding the need for deterministic dispatch would be for the primary to send a message to the backup(s) to indicate an allocation before the deliver is sent to the client. This could inform the backup of the queue in question, the message id and the delivery tag/request id. The big drawback is that it requires a round-trip to the backup before each deliver and would really affect throughput.</p>

<p>This looks like an area that needs some specific focus. Can we convince ourselves of a clear and deterministic dispatch algorithm, are thereother solutions that would avoid requiring this without too much synchronicity?</p>



<h2><a name="ClusterDesignNote-message.consume%2Fbasic.consume"></a>message.consume/basic.consume</h2>
<ul>
	<li>proxy: forward consume. No replication, client will re-establish consumers.</li>
	<li>primary: register consumer.</li>
</ul>



<h2><a name="ClusterDesignNote-basic.ack%2Fmessage.ok%28fromclient%29"></a>basic.ack/message.ok(from client)</h2>
<ul>
	<li>proxy: forward</li>
	<li>primary: mark message processed, replicate to backups.</li>
</ul>



<h2><a name="ClusterDesignNote-basic.ack%2Fmessage.ok%28frombroker%29"></a>basic.ack/message.ok(from broker)</h2>
<ul>
	<li>proxy: forward to client</li>
	<li>client: mark message processed.</li>
</ul>



<h2><a name="ClusterDesignNote-basic.reject%2Fmessage.reject"></a>basic.reject / message.reject</h2>

<p>Similar to the processing of basic.ack. However here the message might be requeued or might be moved to a dead letter queue. Ignoring the dead letter queue in the first instance, the backup would merely cancel the effect of the basic.allocate on receiving the basic.reject.</p>


<h2><a name="ClusterDesignNote-reference.open%2Fapppend%2Fclose%28clienttobroker%29"></a>reference.open/apppend/close (client to broker)</h2>
<ul>
	<li>proxy: replicate to session backup, forward to primary.</li>
	<li>primary: process.</li>
</ul>



<h2><a name="ClusterDesignNote-reference.open%2Fapppend%2Fclose%28brokertoclient%29"></a>reference.open/apppend/close (broker to client) **</h2>
<ul>
	<li>primary: send open/append/close.</li>
	<li>proxy: replicate to session backup, forward to client.</li>
</ul>



<h2><a name="ClusterDesignNote-Allcommands"></a>All commands</h2>
<ul>
	<li>proxy replicates required command history to session backup.</li>
</ul>




<h1><a name="ClusterDesignNote-ClientBrokerProtocol"></a>Client-Broker Protocol</h1>

<p>Normal AMQP with the following extensions.</p>

<p>Initial connection:</p>
<ul>
	<li>Pass session name as 0-9 connection identifier or via arguments table.</li>
	<li>Broker provides list of failover replicas in arguments table.</li>
</ul>


<p>During connection:</p>
<ul>
	<li>Client can subscribe to a special "cluster exchange" for messages carrying updates to failover candidates.</li>
</ul>


<p>On failure:</p>
<ul>
	<li>client chooses failover node randomly from most recent list.</li>
	<li>cluster list my identify "preferred" failover candidates.</li>
</ul>


<p>On re-connect:</p>
<ul>
	<li>0-9 resume command identifies session.</li>
	<li>Client rebuilds conversational state.</li>
	<li>opens channels</li>
	<li>creates consumers</li>
	<li>establishes</li>
	<li>replays unacknowledeged commands and continues session.</li>
</ul>


<p>Note: the client sends conversational state data in messages to a special system exchange. We cant simply use standard AMQP to rebuild channel state, as we will end up with channels with a different command numbering from the interrupted session. For transparency we also want to distinguish reconnection from resumed "normal" operation.</p>

<p>At this point the session can continue.</p>


<h1><a name="ClusterDesignNote-BrokerBrokerProtocol"></a>Broker-Broker Protocol</h1>

<p>Broker-broker communication uses extended AMQP over specially identified connections and channels (identified in the connection negotiation argument table.)</p>

<p><b>EVS</b>: First implementation is entirely EVS, all members have a common shared picture of the entire cluster contents.</p>

<p><b>Proxying</b>: acting as a proxy, a broker forwards commands from client to primary and vice versa. The proxy is as transparent and stateless as possible. A proxy must renumer channels and commands since a single incoming connection may be proxied to more than one outbound connection, so it does need to keep some state. This state is part of the session state replicated to the session backup.</p>

<p><b>Queue/fragment replication</b>: Depends on whether AMQP or GFS is used to replicate content.</p>

<p><b>AMQP</b>: For enqueue use AMQP transfer command to transfer content to backup(s). For dequeue use AMQP get command to indicate message removed - no data is transferfed for get over a replication channel.</p>

<p><em>TODO</em>: this use of get is strained, it starts to look like we may need a separate replication class of commands.</p>

<p><b>GFS</b>: Queue state is updated in journal files. On failover, the backup reconstruct queue state from the journal.</p>

<p><b>Session replication</b>: The broker must replicate a command (and get confirmation it was replicated) before responding. For async clients this can be done in a pair of asynchronous streams, i.e. we don't have to wait for a response to command A before we forward command B.</p>

<p>Session data is replicated via AMQP on special connections. Primary forwards all outgoing requests and incoming responses to the session backup. Backup can track the primary request/response tables and retransmit messages.</p>

<p><em><b>TODO</b></em>: 0-9 references force us to have heavy session backup, because message data on a reference is not associated with any queue and therefore can't be backed up in a queue backup. If references are removed in 0-10 revisit the need for session backups, we may be able to comress session data enough to store it in the cluster map.</p>


<h1><a name="ClusterDesignNote-PersistenceandRecovery"></a>Persistence and Recovery</h1>

<h2><a name="ClusterDesignNote-Competingfailuremodes%3A"></a>Competing failure modes:</h2>

<p><b>Tibco</b>: fast when running clean but performance over time has GC "spikes" Single journal for all queues. "holes" in log have to be garbage collected to re-use the log. 1 slow consumer affects everyone because it causes fragmentation of the log.</p>

<p><b>MQ</b>: write to journal, write journal to DB, read from DB. Consistent &amp; reliable but slow.</p>

<p><b>Street homegrown solutions</b>: transient MQ with home grown persistence. Can we get more design details for these solutions?</p>



<h2><a name="ClusterDesignNote-Persistenceoverview"></a>Persistence overview</h2>

<p>There are 3 reasons to persist a message:</p>

<p><b>Durable messages</b>: must be stored to disk across broker shutdowns or failures.</p>
<ul>
	<li>stored when received.</li>
	<li>read during start-up.</li>
	<li>must be removed after deliver.</li>
</ul>


<p><b>Reliability</b>: recover after a crash.</p>
<ul>
	<li>stored when received.</li>
	<li>read during crash recovery.</li>
	<li>must be removed after delivery.</li>
</ul>


<p><b>Flow-to-disk</b>: to reduce memory use for full queues.</p>
<ul>
	<li>stored when memory gets tight.</li>
	<li>read when delivered.</li>
	<li>must be removed after delivery.</li>
</ul>



<p>Durable and reliable cases are very similar: storage time is performance-critical (blocks response to sender) but reading is not and cleanup can be done by an async thread or process.</p>

<p>For flow-to-disk, when queues are full, both store and reading are critical.</p>

<p>So it looks like the same solution will work for durable and reliable.</p>

<p>Flow-to-disk has different requirements but it would be desirable to re-use some or all of the durable/reliable solution. In particular if flow-to-disk is combined with durable/reliablle it would be wasteful to write the message to disk a second time - instead it would seem better to keep an in-memory index that allows messages to be read quickly from the reliable/durable store.</p>

<p>We also need to persist <b>wiring</b> (Queues/Exchanges/Bindings), but this is much less performance critical. The entire wiring model is held in memory so wiring is only read at startup, and updates are low volume and not performance-critical. A simple database should suffice.</p>



<h1><a name="ClusterDesignNote-Journals"></a>Journals</h1>

<h2><a name="ClusterDesignNote-Overview"></a>Overview</h2>

<p>A journal is a sequential record of actions taken (e.g. messages enqueued, responses sent.) sufficient to reconstruct the state of the journalled entity (e.g. queue) in the case of failure and recovery.</p>


<p><b>TODO</b>: <em>Journal indexing, async journal (thruput vs. latency), journal as common API for backups and disk store?</em></p>

<p><em><b>TODO</b></em>: <em>Windows for error in journalling - how to make disk update and network ack atomic?  How do other technologies handle it?</em></p>

<p><em><b>TODO</b></em>: <em>References strike again: where do they go in a journal-per-queue?</em></p>

<p><em><b>TODO</b></em>: <em>Journal per broker pros/cons</em></p>


<h2><a name="ClusterDesignNote-Useofjournals"></a>Use of journals</h2>

<p>For reliability and durability we will use</p>
<ul>
	<li>Queue journal (per queue) records enqueue/dequeues and acknowledgements.</li>
	<li>Session journal (per session) records references in progress</li>
</ul>


<p>The broker writes enqueue and dequeue records to the end of the active journal file. When the file reaches a fixed size it starts a new one.</p>

<p>A cleanup agent (thread or process) removes, recycles or compacts journal files that have no live (undelivered) messages. (References complicate the book-keeping a little but don't alter the conceptual model.)</p>

<p>Recovery or restart reconstructs the queue contents from the enqueue/dequeue records in the journal.</p>

<p>Flow-to-disk can re-use the journal framework, with a simple extension: the broker keeps an in-memory index of live messages in the journal.</p>

<p>If flow-to-disk is combined with reliability then messages are automatically journalled on arrival, so flow-to-disk can simply delete them from memory and use the in-memory index to read them for delivery.</p>

<p>Without reliability flow-to-disk is similar except that messages are only journalled if memory gets tight.</p>

<p><b>Disk thrashing</b>: Why do we think skipping disk heads around between multiple journals will be better than seeking up and down a single journal? Are we assuming that we only need to optimize the case where long sequences of traffic tend to be for the same queue?</p>

<p><b>No write on fast consume</b>: Optimization - if we can deliver (and get ack) faster than we write then no need to write. How does this interact with HA?</p>

<p><b>Async journalling</b>: writing to client, writing to journal, acks from client, acks from journal are separate async streams? So if we get client ack before the journalling stream has written the journal we cancel the write? But what kind of ack info do we need? Need a diagram of interactions, failure points and responses at each point. Start simple and optimize, but dont rule out optimizations.</p>


<h2><a name="ClusterDesignNote-Whataboutdisklessreliability%3F"></a>What about diskless reliability?</h2>

<p>Is memory+network replication with no disk a viable option for high-speed transient message flow? May be faster, but can't support durable messages/persistent queues. We will lose messages in total failure or multiple failures where all backups fail, but we can survive single failures and will run a lot faster than diskful.</p>




<h1><a name="ClusterDesignNote-Virtualsynchrony"></a>Virtual synchrony</h1>

<p><b>TODO</b>: Wiring &amp; membership via virtual synchrony</p>

<p><b>TODO</b>: journaling, speed. Will file-per-q really help with disk burnout?</p>


<h1><a name="ClusterDesignNote-Configuration"></a>Configuration</h1>

<h2><a name="ClusterDesignNote-SimplifyingpatternsPossiblewaystoconfigureacluster%3A"></a>Simplifying patterns Possible ways to configure a cluster:</h2>
<ul>
	<li>Virtual hosts as units of replication.</li>
	<li>Backup rings: all primary components in a broker use the same backup broker and vice-versa. Backups form rings.</li>
	<li>Broker component rinks: all the components <em>except sessions</em> have the same backup broker. Session backups are chosen at random so a brokers load will be distributed rather than all falling on its backup.</li>
	<li>Disk management issues?</li>
	<li>Shared storage issues?</li>
</ul>



<h2><a name="ClusterDesignNote-Dynamicclusterconfiguration"></a>Dynamic cluster configuration</h2>
<ul>
	<li>Failover: the primary use case.</li>
	<li>Add node: backup, proxy, primary case?</li>
	<li>Redirect clients from loaded broker (pretend failure)</li>
	<li>Move queue primary from loaded broker/closer to consumers?</li>
	<li>Re-start after failover.</li>
</ul>


<p><b>Issue:</b> unit of failover/redirect is connection/channel but "working set" of queues and exchanges is unrelated. Use virtual host as unit for failover/relocation? It's also a queue namespace...</p>

<p>If a queue moves we have to redirect its <em>consumers</em>, can't redirect entire channels! Channels in the same session may move between connections. Or rather we depend on broker to proxy?</p>

<p>Backups: chained backups rather than multi-backup? Ring backup? What about split brain, elections, quorums etc.</p>

<p>Should new backups acquire state from primary, from disk or possibly both? Depends on GFS/SAN vs. commodity hw?</p>



<h1><a name="ClusterDesignNote-Transactions"></a>Transactions</h1>

<h2><a name="ClusterDesignNote-Localtransactions"></a>Local transactions</h2>

<p>AMQP offers local and distributed transactions, however in a cluster a local transaction could involve queues that are distributed across several nodes.</p>

<p><em><b>TODO</b></em>: This complicates the model of a proxy as a simple forwarder. You cannot simply forward a local transaction involving queues on two separate primary brokers, the proxy has to be aware of the transaction.</p>

<p><em><b>TODO</b></em> Can we use point-to-point local transactions or do we have to turn this into a dtx? If dtx, who co-ordinates? Is every broker potentially a transaction co-ordinator?</p>

<p><em><b>TODO</b></em>: For distributed transactions, will the primary broker and its backups act as a single clustered resource manager for the resource set, or will a failure of one broker abort the transaction?</p>


<h2><a name="ClusterDesignNote-DistributedTransactions"></a>Distributed Transactions</h2>

<p>The prepare needs to be replicated so that if one node fails before completion another node can honour the guarantee to be able to commit or abort. It is also possibe that the work of a transaction is distributed across more than one node anyway.</p>

<p>I think broadcasting all dtx commands over the group communication protocol seems like the most likely way to handle this.</p>

<p>The session in which the commands are initiated needs to be replicated also to allow clean resumption on failover.</p>




<h1><a name="ClusterDesignNote-OpenQuestions"></a>Open Questions</h1>

<p>Issues: double failure in backup ring: A -&gt; B -&gt; C. Simultaneous failure of A and B. C doesn't have the replica data to take over for A.</p>

<p>Java/C++ interworking - is there a requirement? Fail over from C++ to Java? Common persistence formats?</p>



				    
                    			    </td>
		    </tr>
	    </table>
	    <table border="0" cellpadding="0" cellspacing="0" width="100%">
			<tr>
				<td height="12" background="border/border_bottom.gif"><img src="border/spacer.gif" width="1" height="1" border="0"/></td>
			</tr>
		    <tr>
			    <td align="center"><font color="grey">Document generated by Confluence on Apr 22, 2008 02:47</font></td>
		    </tr>
	    </table>
    </body>
</html>