1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
<html>
<head>
<title>Apache Qpid : Queue Replay</title>
<link rel="stylesheet" href="styles/site.css" type="text/css" />
<META http-equiv="Content-Type" content="text/html; charset=UTF-8">
</head>
<body>
<table class="pagecontent" border="0" cellpadding="0" cellspacing="0" width="100%" bgcolor="#ffffff">
<tr>
<td valign="top" class="pagebody">
<div class="pageheader">
<span class="pagetitle">
Apache Qpid : Queue Replay
</span>
</div>
<div class="pagesubheading">
This page last changed on Feb 20, 2007 by <font color="#0050B2">ritchiem</font>.
</div>
<h2><a name="QueueReplay-Background"></a>Background</h2>
<p>A lengthy discussion on replay in <a href="http://mail-archives.apache.org/mod_mbox/incubator-qpid-dev/200701.mbox/browser" title="Visit page outside Confluence">January 2007</a>(page #7, <a href="http://mail-archives.apache.org/mod_mbox/incubator-qpid-dev/200701.mbox/%3c000d01c73b18$b9e0a670$b092fea9@thinkpad%3e" title="Visit page outside Confluence">Thread</a>) highlighted a number of requirements and possible implementation options for adding replay to Qpid and AMQP. The requirements come from the desire to speed up the rate a consumer can read messages and to simplify its recovery when it starts. This page is to give some background, a proposal and finally some implementation options for discussion.</p>
<h3><a name="QueueReplay-History"></a>History</h3>
<p>When a an application updates the state of a single <b>Resource Manager</b>, e.g a database or queue manager, it normally does so within the context of a local <b>Transaction</b> and this transaction exhibits the following ACID properties:</p>
<ul>
<li><b>Atomicity</b> The result of the transaction are either all commited or all rolled back.</li>
<li><b>Consistency</b> The completed transaction transformed the resource from one known state to another. Inserting a row into a database or removing a message from a queue are common examples.</li>
<li><b>Isolation</b> Changes the the resources state effected by the transaction does not become visible ouside of the transaction until the transaction commits.</li>
<li><b>Durability</b> The changes that reasult from the transactions commitment survive subsequent system or media faulures.</li>
</ul>
<h3><a name="QueueReplay-DistributedTransaction"></a>Distributed Transaction</h3>
<p>A distributed transaction is typically implemented by performing a <b>Two Phase Commit</b> (2PC) over which there are several varients the most well know being the X/Open XA specification. Where both the middleware and the consumer support XA, a separate <b>Transaction Manager</b> isused to coordinate the local transactions. The transaction manager coordinates atomicity at the global level whist each resource manager is responsible for the ACID properties of its local transactions.</p>
<p>These benefits do not come without cost.</p>
<ul>
<li>Increased transaction processing latency, typically due to the additional forced disk writes.</li>
<li>Applications can become blocked pending the resolution of an in-doubt global transaction.</li>
<li>Reduced concurrency</li>
<li>Multi-system deadlock</li>
<li>Administration complexity</li>
<li>Backing up the transaction manager involves co-ordinating all transaction logs at the same time or processing must be suspended.</li>
</ul>
<h3><a name="QueueReplay-IdempotenceandReplaying"></a>Idempotence and Replaying </h3>
<p>When a message is being moved from system A to system B (e.g. from WebSphereMQ to ORACLE), distributed transactions can be avoided if;</p>
<ul>
<li>System B can handle duplicates (or can detect them and deal with them accordingly) - i.e. it is idempotent.</li>
<li>System A can replay messages from a known stable point in history.</li>
</ul>
<p>The most common way of doing this is simply managing the local transactions so that system B commits before system A. The start of replay is them the end of the last transaction on system A.</p>
<p>Typically a MOM will immediately delete a message once it has been commited by all of its consumers.</p>
<h3><a name="QueueReplay-CommitisNotTheEnd"></a>Commit is Not The End</h3>
<p>If messages are made available for replay to a consumer after it has been commited, we can stretch the point in time the consumer recovers from back to any point.</p>
<ul>
<li>A few minutes ago</li>
<li>Trade ID FFS987654321</li>
<li>Start of day</li>
<li>End of yesterday</li>
<li>Friday.</li>
</ul>
<p>This larger recovery window lets downstream consumers the flexibility to recover from more failure scenarios.</p>
<ul>
<li>Retry an end of day batch job.</li>
<li>Replay due to reference data problem in target system.</li>
<li>Replay due to database or application failure.</li>
</ul>
<h2><a name="QueueReplay-ReplayasaFirstClassService"></a>Replay as a First Class Service</h2>
<p>When a traditional queue is opened for reading, it is opened and the next message is the oldest one that has not been destructively read (i.e. read and commited).</p>
<p>In isolation, a consumer manages its own local transactions with the message broker to confirm when a message or group of messages is processed, stored and <b>stable</b>. The local transaction leads to a <b>disk write</b> in the queue storage to mark the messages as read.</p>
<p>In this XA free world the consumer relys on the messaging to replay messages from the applications last good known state. As its always reading from a queue, the only extension to the queues semantics is to let it be opened for reading from a known message, irrespective of whether the message has been committed or not and it goes without saying that they should be in the same order in which they were originally delivered.</p>
<p><div align="center"><img src="Queue Replay_attachments/TraditionalQueue.gif" border="0" /></div></p>
<p>Many consumers of a guaranteed message flow are writing to a database and this database is the consumers view of its state, it's certainly where the consumer recovers from when it starts up. The traditional model of using a transaction manager, typically XA, to co-ordinate the local transactions on the database and messaging broker is slow and not without its problems. </p>
<p><div align="center"><img src="Queue Replay_attachments/TraditionalQueueWithTX.gif" border="0" /></div></p>
<p>Another model is to have the messaging infrastructure support replay of messages from a known point in history i.e. to correlate the current state of the consumers database with a <b>last received</b> message that last caused an update to the database from this channel. This is not an all encompassing pattern but rather compliments other ways to synchronize state between a message broker and a database.</p>
<p><div align="center"><img src="Queue Replay_attachments/ReplayableQueue.gif" border="0" /></div></p>
<h2><a name="QueueReplay-Requirements"></a>Requirements</h2>
<ul>
<li><b>Replay messages</b> from a queue from a given message identified by a message ID or a header property.</li>
<li><b>Administrative support</b> to purge messages from a queue as part of a business process such as <b>End of Day</b></li>
<li><b>Zero impact</b> on other queues and their consumers.</li>
</ul>
<h2><a name="QueueReplay-Proposal%3AAReplayableQueue"></a>Proposal: A Replayable Queue</h2>
<p>Queues are the storage agents in AMQP so are the logical point to provide replay. A Replayable Queue (<b>RQ</b>) is not the default queue behavior but rather has to be <b>explicitly configured</b>. In may ways an RQ is somewhere between a traditional queue and a transaction log such as <a href="http://howl.objectweb.org/" title="Visit page outside Confluence">HOWL</a></p>
<p>An RQ has the following properties:</p>
<ul>
<li>An RQ can only have a <b>single consumer</b>. Multiple consumers complicate the problem so I propose discounting them for now.</li>
<li>Messages are <b>not deleted</b> when consumed by a regular consumer. The act of acknowledging the message is just another property on the message. Indeed, the consumer may never acknowledge the message as this implies a write on the message broker to update the messages state.</li>
<li>When an RQ is opened for reading, the consumer must give a selector that will <b>identify a point in the queue</b> to begin message delivery from.</li>
<li>Administratively <b>defined points</b> in the queue exist. These points can be defined by an administration API and associated tooling and used as points to replay from.</li>
<li>Queues are <b>purged of messages</b> by the administration API or associated tooling. This allows external processes such as End of Day to initiate message archiving or deletion when it is safe to do so.</li>
<li>An RQ can be replicated. Implementation options? SAN replication, dual writes?</li>
</ul>
<h3><a name="QueueReplay-Benefits"></a>Benefits</h3>
<ul>
<li>An RQ, by virtue of a single consumer, does not need to be written to when a consumer reads messages as it is the responsibility of the consumer to provide the synchronization point when it first connects. This can significantly speed up the consumer as its bottleneck will be its own database write.</li>
</ul>
<ul>
<li>A complete record of all messaging activity is available.</li>
</ul>
<h3><a name="QueueReplay-Downsides"></a>Downsides</h3>
<ul>
<li>The size of the store needs careful management so any implementation details do not cause performance issues.</li>
</ul>
<h2><a name="QueueReplay-ImplementationOptionsinQpid."></a>Implementation Options in Qpid.</h2>
<h3><a name="QueueReplay-Configuration"></a>Configuration</h3>
<h3><a name="QueueReplay-Storage"></a>Storage</h3>
<h3><a name="QueueReplay-Management"></a>Management</h3>
<h3><a name="QueueReplay-UsagefromJMS"></a>Usage from JMS</h3>
<br/>
<div class="tabletitle">
<a name="attachments">Attachments:</a>
</div>
<div class="greybox" align="left">
<img src="icons/bullet_blue.gif" height="8" width="8" alt=""/>
<a href="Queue Replay_attachments/TraditionalQueueWithTX.gif">TraditionalQueueWithTX.gif</a> (image/gif)
<br/>
<img src="icons/bullet_blue.gif" height="8" width="8" alt=""/>
<a href="Queue Replay_attachments/TraditionalQueue.gif">TraditionalQueue.gif</a> (image/gif)
<br/>
<img src="icons/bullet_blue.gif" height="8" width="8" alt=""/>
<a href="Queue Replay_attachments/ReplayableQueue.gif">ReplayableQueue.gif</a> (image/gif)
<br/>
</div>
</td>
</tr>
</table>
<table border="0" cellpadding="0" cellspacing="0" width="100%">
<tr>
<td height="12" background="border/border_bottom.gif"><img src="border/spacer.gif" width="1" height="1" border="0"/></td>
</tr>
<tr>
<td align="center"><font color="grey">Document generated by Confluence on Apr 22, 2008 02:47</font></td>
</tr>
</table>
</body>
</html>
|