summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-20 22:42:39 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-20 22:42:39 +0100
commit79823b5509b0853b0760879186adaa68e158aea3 (patch)
treea9e10df275e70d52c727a3307caddc010c137620
parent4345618a753e8667365e3eac3890ec81140336f5 (diff)
downloadrabbitmq-server-git-79823b5509b0853b0760879186adaa68e158aea3.tar.gz
Made messages be marked as delivered during prefetch *before* they are passed to the prefetcher.
-rw-r--r--src/rabbit_disk_queue.erl4
-rw-r--r--src/rabbit_queue_prefetcher.erl82
2 files changed, 43 insertions, 43 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 9e70f8c5a5..96125031dc 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -529,7 +529,7 @@ handle_cast({set_mode, Mode}, State) ->
mixed -> fun to_ram_disk_mode/1
end)(State));
handle_cast({prefetch, Q, From}, State) ->
- {ok, Result, State1} = internal_fetch(Q, true, true, false, State),
+ {ok, Result, State1} = internal_fetch(Q, true, false, false, State),
Cont = rabbit_misc:with_exit_handler(
fun () -> false end,
fun () ->
@@ -539,7 +539,7 @@ handle_cast({prefetch, Q, From}, State) ->
State3 =
case Cont of
true ->
- case internal_fetch(Q, false, false, true, State1) of
+ case internal_fetch(Q, false, true, true, State1) of
{ok, empty, State2} -> State2;
{ok, {_MsgId, _IsPersistent, _Delivered, _MsgSeqId, _Rem},
State2} -> State2
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index ad6b1ce2d4..f22aa6afe3 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -82,29 +82,36 @@
%% priority cast. Note that in the mean time, the mixed_queue could
%% have come along, found the prefetcher empty, asked it to
%% exit. This means the effective "reply" from the disk_queue will
-%% go no where. As a result, the disk_queue must perform no
-%% modification to the status of the message *or the queue* - do
-%% not mark the message delivered, and do not advance the queue. If
-%% it did advance the queue and the msg was then lost, then the
-%% queue would have lost a msg that the mixed_queue would not pick
-%% up.
+%% go no where. As a result, the disk_queue should not advance the
+%% queue. However, it does mark the messages as delivered. The
+%% reasoning is that if it didn't, there would be the possibility
+%% that the message was delivered without it being marked as such
+%% on disk. We must maintain the property that a message which is
+%% marked as non-redelivered really hasn't been delivered anywhere
+%% before. The downside is that should the prefetcher not receive
+%% this message, the queue will then fetch the message from the
+%% disk_queue directly, and this message will have its delivered
+%% bit set. The queue will not be advanced though - if it did
+%% advance the queue and the msg was then lost, then the queue
+%% would have lost a msg that the mixed_queue would not pick up.
%%
%% 3) The prefetcher hopefully receives the call from
%% prefetcher:publish(Msg). It replies immediately, and then adds
-%% to its internal queue. A cast is not sufficient here because the
-%% mixed_queue could come along, drain the prefetcher, thus
-%% catching the msg just sent by the disk_queue and then call
-%% disk_queue:fetch(Q) which is normal priority call, which could
-%% overtake a reply cast from the prefetcher to the disk queue,
-%% which would result in the same message being delivered
+%% to its internal queue. A cast is not sufficient as a pseudo
+%% "reply" here because the mixed_queue could come along, drain the
+%% prefetcher, thus catching the msg just sent by the disk_queue
+%% and then call disk_queue:fetch(Q) which is normal priority call,
+%% which could overtake a reply cast from the prefetcher to the
+%% disk queue, resulting in the same message being delivered
%% twice. Thus when the disk_queue calls prefetcher:publish(Msg),
%% it is briefly blocked. However, a) the prefetcher replies
%% immediately, and b) the prefetcher should never have more than
-%% one item in its mailbox anyway, so this should not cause a
-%% problem to the disk_queue.
+%% two items in its mailbox anyway (one from the queue process /
+%% mixed_queue and one from the disk_queue), so this should not
+%% cause a problem to the disk_queue.
%%
-%% 4) The disk_queue receives the reply, marks the msg at the head of
-%% the queue Q as delivered, and advances the Q to the next msg.
+%% 4) The disk_queue receives the reply, and advances the Q to the
+%% next msg.
%%
%% 5) If the prefetcher has not met its target then it goes back to
%% 1). Otherwise it just sits and waits for the mixed_queue to
@@ -125,29 +132,30 @@
%% on talk directly with the disk_queue and not via the
%% prefetcher. This is more efficient and the mixed_queue will use
%% normal priority blocking calls to the disk_queue and thus get
-%% better service that way.
+%% better service.
%%
%% The prefetcher may at this point have issued a
%% disk_queue:prefetch(Q) cast which has not yet been picked up by the
%% disk_queue. This msg won't go away and the disk_queue will
%% eventually find it. However, when it does, it'll simply read the
%% next message from the queue (which could now be empty), possibly
-%% populate the cache (no harm done) and try and call
-%% prefetcher:publish(Msg) which will result in an error, which the
-%% disk_queue catches, as the publish call is to a non-existant
-%% process. However, the state of the queue and the state of the
-%% message has not been altered so the mixed_queue will be able to
-%% fetch this message as if it had never been prefetched.
+%% populate the cache (no harm done), mark the message as deleted (oh
+%% well, not a spec violation, and better than the alternative) and
+%% try and call prefetcher:publish(Msg) which will result in an error,
+%% which the disk_queue catches, as the publish call is to a
+%% non-existant process. However, the state of the queue has not been
+%% altered so the mixed_queue will be able to fetch this message as if
+%% it had never been prefetched.
%%
-%% The only point at which the queue is advanced and the message
-%% marked as delivered is when the prefetcher replies to the publish
-%% call. At this point the message has been received by the prefetcher
-%% and so we guarantee it will be passed to the mixed_queue when the
-%% mixed_queue tries to drain the prefetcher. We must therefore ensure
-%% that this msg can't also be delivered to the mixed_queue directly
-%% by the disk_queue through the mixed_queue calling
-%% disk_queue:fetch(Q) which is why the prefetcher:publish function
-%% is a call and not a cast, thus blocking the disk_queue.
+%% The only point at which the queue is advanced is when the
+%% prefetcher replies to the publish call. At this point the message
+%% has been received by the prefetcher and so we guarantee it will be
+%% passed to the mixed_queue when the mixed_queue tries to drain the
+%% prefetcher. We must therefore ensure that this msg can't also be
+%% delivered to the mixed_queue directly by the disk_queue through the
+%% mixed_queue calling disk_queue:fetch(Q) which is why the
+%% prefetcher:publish function is a call and not a cast, thus blocking
+%% the disk_queue.
%%
%% Finally, the prefetcher is only created when the mixed_queue is
%% operating in mixed mode and it sees that the next N messages are
@@ -166,15 +174,7 @@
%% we have no guarantee that the message will really go out of the
%% socket. What we do still have is that messages which have the
%% redelivered bit set false really are guaranteed to have not been
-%% delivered already. In theory, it's possible that the disk_queue
-%% calls prefetcher:publish, blocks waiting for the reply. The
-%% prefetcher grabs the message, is drained, the message goes out of
-%% the socket and is delivered. The broker then crashes before the
-%% disk_queue processes the reply from the prefetcher, thus the fact
-%% the message has been delivered is not recorded. However, this can
-%% only affect a single message at a time. I.e. there is a tiny chance
-%% that the first message delivered on queue recovery that has the
-%% redelivery bit set false, has in fact been delivered before.
+%% delivered already.
start_link(Queue, Count) ->
gen_server2:start_link(?MODULE, [Queue, Count, self()], []).