summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-14 12:38:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-14 12:38:16 +0100
commit6da285ab1609ba470cf3a22ef936d3290ea344d8 (patch)
tree745e4c8a64a9b0a3bde356e37dae451a41195776 /src
parentd01625c878889c1230237d869a9c2c0b86a93766 (diff)
downloadrabbitmq-server-git-6da285ab1609ba470cf3a22ef936d3290ea344d8.tar.gz
Adjust the prefetcher in light of the bug matthias discovered. Documentation updated too.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl36
-rw-r--r--src/rabbit_queue_prefetcher.erl144
2 files changed, 89 insertions, 91 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 76a901fdc6..2f8310581c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -43,8 +43,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, length/1, foldl/3, prefetch/1,
- set_delivered_and_advance/2
+ requeue_next_n/2, length/1, foldl/3, prefetch/1
]).
-export([filesync/0, cache_info/0]).
@@ -267,8 +266,6 @@
-spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}],
[{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
--spec(set_delivered_and_advance/2 ::
- (queue_name(), {msg_id(), seq_id()}) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok').
-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
@@ -323,9 +320,6 @@ tx_commit(Q, PubMsgIds, AckSeqIds)
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
-set_delivered_and_advance(Q, MsgSeqId) ->
- gen_server2:cast(?SERVER, {set_delivered_and_advance, Q, MsgSeqId}).
-
requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}).
@@ -547,16 +541,24 @@ handle_cast(report_memory, State) ->
noreply1(State #dqstate { memory_report_timer = undefined });
handle_cast({prefetch, Q, From}, State) ->
{ok, Result, State1} = internal_deliver(Q, true, true, false, State),
- ok = rabbit_queue_prefetcher:publish(From, Result),
- noreply(State1);
-handle_cast({set_delivered_and_advance, Q, MsgSeqId}, State) ->
- State2 =
- case internal_deliver(Q, false, false, true, State) of
- {ok, empty, State1} -> State1;
- {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Rem}, State1} ->
- State1
- end,
- noreply(State2).
+ Cont =
+ try
+ ok = rabbit_queue_prefetcher:publish(From, Result),
+ true
+ catch exit:{noproc, _} ->
+ false
+ end,
+ State3 =
+ case Cont of
+ true ->
+ case internal_deliver(Q, false, false, true, State1) of
+ {ok, empty, State2} -> State2;
+ {ok, {_MsgId, _IsPersistent, _Delivered, _MsgSeqId, _Rem},
+ State2} -> State2
+ end;
+ false -> State1
+ end,
+ noreply(State3).
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index 0265ba2bd2..bab5396e44 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -89,20 +89,22 @@
%% queue would have lost a msg that the mixed_queue would not pick
%% up.
%%
-%% 3) The prefetcher hopefully receives the cast from
-%% prefetcher:publish(Msg). It then adds to its internal queue and
-%% calls disk_queue:set_delivered_and_advance(Q) which is a normal
-%% priority cast. This cannot be low-priority because if it was,
-%% the mixed_queue could come along, drain the prefetcher, thus
+%% 3) The prefetcher hopefully receives the call from
+%% prefetcher:publish(Msg). It replies immediately, and then adds
+%% to its internal queue. A cast is not sufficient here because the
+%% mixed_queue could come along, drain the prefetcher, thus
%% catching the msg just sent by the disk_queue and then call
%% disk_queue:deliver(Q) which is normal priority call, which could
-%% overtake the low-priority
-%% disk_queue:set_delivered_and_advance(Q) cast and thus result in
-%% the same msg being delivered by the queue twice.
+%% overtake a reply cast from the prefetcher to the disk queue,
+%% which would result in the same message being delivered
+%% twice. Thus when the disk_queue calls prefetcher:publish(Msg),
+%% it is briefly blocked. However, a) the prefetcher replies
+%% immediately, and b) the prefetcher should never have more than
+%% one item in its mailbox anyway, so this should not cause a
+%% problem to the disk_queue.
%%
-%% 4) The disk_queue receives the set_delivered_and_advance(Q) cast,
-%% marks the msg at the head of the queue Q as delivered, and
-%% advances the Q to the next msg.
+%% 4) The disk_queue receives the reply, marks the msg at the head of
+%% the queue Q as delivered, and advances the Q to the next msg.
%%
%% 5) If the prefetcher has not met its target then it goes back to
%% 1). Otherwise it just sits and waits for the mixed_queue to
@@ -123,45 +125,37 @@
%% on talk directly with the disk_queue and not via the
%% prefetcher. This is more efficient and the mixed_queue will use
%% normal priority blocking calls to the disk_queue and thus get
-%% better service that way. When exiting in this way, two situations
-%% could occur:
+%% better service that way.
%%
-%% 1) The prefetcher has issued a disk_queue:prefetch(Q) which has not
-%% yet been picked up by the disk_queue. This msg won't go away and
-%% the disk_queue will eventually find it. However, when it does,
-%% it'll simply read the next message from the queue (which could now
-%% be empty), possibly populate the cache (no harm done) and try and
-%% call prefetcher:publish(Msg) which will go no where. However, the
-%% state of the queue and the state of the message has not been
-%% altered so the mixed_queue will be able to fetch this message as if
-%% it had never been prefetched.
-%%
-%% 2) The disk_queue has already picked up the disk_queue:prefetch(Q)
-%% low priority message and has read the message and replied, by
-%% calling prefetcher:publish(Msg). In fact, it's possible that
-%% message is directly behind the call from mixed_queue to
-%% prefetcher:drain(). Same reasoning as in 1) applies - neither the
-%% queue's nor the message's state have been altered, so the
-%% mixed_queue can absolutely go and fetch the message again.
+%% The prefetcher may at this point have issued a
+%% disk_queue:prefetch(Q) cast which has not yet been picked up by the
+%% disk_queue. This msg won't go away and the disk_queue will
+%% eventually find it. However, when it does, it'll simply read the
+%% next message from the queue (which could now be empty), possibly
+%% populate the cache (no harm done) and try and call
+%% prefetcher:publish(Msg) which will result in an error, which the
+%% disk_queue catches, as the publish call is to a non-existant
+%% process. However, the state of the queue and the state of the
+%% message has not been altered so the mixed_queue will be able to
+%% fetch this message as if it had never been prefetched.
%%
%% The only point at which the queue is advanced and the message
-%% marked as delivered is when the prefetcher calls
-%% disk_queue:set_delivered_and_advance(Q). At this point the message
-%% has been received by the prefetcher and so we guarantee it will be
-%% passed to the mixed_queue when the mixed_queue tries to drain the
-%% prefetcher. We must therefore ensure that this msg can't also be
-%% delivered to the mixed_queue directly by the disk_queue through the
-%% mixed_queue calling disk_queue:deliver(Q) which is why the
-%% disk_queue:set_delivered_and_advance(Q) cast must be normal
-%% priority (or at least match the priority of disk_queue:deliver(Q)).
+%% marked as delivered is when the prefetcher replies to the publish
+%% call. At this point the message has been received by the prefetcher
+%% and so we guarantee it will be passed to the mixed_queue when the
+%% mixed_queue tries to drain the prefetcher. We must therefore ensure
+%% that this msg can't also be delivered to the mixed_queue directly
+%% by the disk_queue through the mixed_queue calling
+%% disk_queue:deliver(Q) which is why the prefetcher:publish function
+%% is a call and not a cast, thus blocking the disk_queue.
%%
%% Finally, the prefetcher is only created when the mixed_queue is
%% operating in mixed mode and it sees that the next N messages are
-%% all on disk. During this phase, the mixed_queue can be asked to go
-%% back to disk_only mode. When this happens, it calls
-%% prefetcher:drain_and_stop() which behaves like two consecutive
-%% calls to drain() - i.e. replies with all prefetched messages and
-%% causes the prefetcher to exit.
+%% all on disk, and the queue process is about to hibernate. During
+%% this phase, the mixed_queue can be asked to go back to disk_only
+%% mode. When this happens, it calls prefetcher:drain_and_stop() which
+%% behaves like two consecutive calls to drain() - i.e. replies with
+%% all prefetched messages and causes the prefetcher to exit.
%%
%% Note there is a flaw here in that we end up marking messages which
%% have come through the prefetcher as delivered even if they don't
@@ -172,25 +166,24 @@
%% we have no guarantee that the message will really go out of the
%% socket. What we do still have is that messages which have the
%% redelivered bit set false really are guaranteed to have not been
-%% delivered already. Well, almost: if the disk_queue has a large back
-%% log of messages then the prefetcher invocation of
-%% disk_queue:set_delivered_and_advance(Q) may not be acted upon
-%% before a crash. However, given that the prefetching is operating in
-%% lock-step with the disk_queue, this means that at most, 1 (one)
-%% message can fail to have its delivered flag raised. The alternative
-%% is that disk_queue:set_delivered_and_advance(Q) could be made into
-%% a call. However, if the disk_queue is heavily loaded, this can
-%% block the prefetcher for some time, which in turn can block the
-%% mixed_queue when it wants to drain the prefetcher.
+%% delivered already. In theory, it's possible that the disk_queue
+%% calls prefetcher:publish, blocks waiting for the reply. The
+%% prefetcher grabs the message, is drained, the message goes out of
+%% the socket and is delivered. The broker then crashes before the
+%% disk_queue processes the reply from the prefetcher, thus the fact
+%% the message has been delivered is not recorded. However, this can
+%% only affect a single message at a time. I.e. there is a tiny chance
+%% that the first message delivered on queue recovery that has the
+%% redelivery bit set false, has in fact been delivered before.
start_link(Queue, Count) ->
gen_server2:start_link(?MODULE, [Queue, Count, self()], []).
publish(Prefetcher, Obj = { #basic_message {}, _Size, _IsDelivered,
_AckTag, _Remaining }) ->
- gen_server2:cast(Prefetcher, {publish, Obj});
+ gen_server2:call(Prefetcher, {publish, Obj}, infinity);
publish(Prefetcher, empty) ->
- gen_server2:cast(Prefetcher, publish_empty).
+ gen_server2:call(Prefetcher, publish_empty, infinity).
drain(Prefetcher) ->
gen_server2:call(Prefetcher, drain, infinity).
@@ -213,6 +206,25 @@ init([Q, Count, QPid]) ->
{ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN,
?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+handle_call({publish, { Msg = #basic_message {},
+ _Size, IsDelivered, AckTag, _Remaining }},
+ DiskQueue, State =
+ #pstate { fetched_count = Fetched, target_count = Target,
+ msg_buf = MsgBuf, buf_length = Length, queue = Q
+ }) ->
+ gen_server2:reply(DiskQueue, ok),
+ ok = case Fetched + 1 == Target of
+ true -> ok;
+ false -> rabbit_disk_queue:prefetch(Q)
+ end,
+ MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
+ {noreply, State #pstate { fetched_count = Fetched + 1,
+ buf_length = Length + 1,
+ msg_buf = MsgBuf1 }, hibernate};
+handle_call(publish_empty, _From, State) ->
+ %% Very odd. This could happen if the queue is deleted or purged
+ %% and the mixed queue fails to shut us down.
+ {reply, ok, State, hibernate};
handle_call(drain, _From, State = #pstate { buf_length = 0 }) ->
{stop, normal, empty, State};
handle_call(drain, _From, State = #pstate { fetched_count = Count,
@@ -230,24 +242,8 @@ handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,
buf_length = Length }) ->
{stop, normal, {MsgBuf, Length}, State}.
-handle_cast(publish_empty, State) ->
- %% Very odd. This could happen if the queue is deleted or purged
- %% and the mixed queue fails to shut us down.
- {noreply, State, hibernate};
-handle_cast({publish, { Msg = #basic_message {},
- _Size, IsDelivered, AckTag, _Remaining }},
- State = #pstate { fetched_count = Fetched, target_count = Target,
- msg_buf = MsgBuf, buf_length = Length, queue = Q
- }) ->
- ok = rabbit_disk_queue:set_delivered_and_advance(Q, AckTag),
- ok = case Fetched + 1 == Target of
- true -> ok;
- false -> rabbit_disk_queue:prefetch(Q)
- end,
- MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
- {noreply, State #pstate { fetched_count = Fetched + 1,
- buf_length = Length + 1,
- msg_buf = MsgBuf1 }, hibernate}.
+handle_cast(Msg, State) ->
+ exit({unexpected_message_cast_to_prefetcher, Msg, State}).
handle_info({'DOWN', MRef, process, _Pid, _Reason},
State = #pstate { queue_mref = MRef }) ->