diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-14 12:38:16 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-14 12:38:16 +0100 |
| commit | 6da285ab1609ba470cf3a22ef936d3290ea344d8 (patch) | |
| tree | 745e4c8a64a9b0a3bde356e37dae451a41195776 /src | |
| parent | d01625c878889c1230237d869a9c2c0b86a93766 (diff) | |
| download | rabbitmq-server-git-6da285ab1609ba470cf3a22ef936d3290ea344d8.tar.gz | |
Adjust the prefetcher in light of the bug matthias discovered. Documentation updated too.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 144 |
2 files changed, 89 insertions, 91 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 76a901fdc6..2f8310581c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -43,8 +43,7 @@ tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, length/1, foldl/3, prefetch/1, - set_delivered_and_advance/2 + requeue_next_n/2, length/1, foldl/3, prefetch/1 ]). -export([filesync/0, cache_info/0]). @@ -267,8 +266,6 @@ -spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}], [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). --spec(set_delivered_and_advance/2 :: - (queue_name(), {msg_id(), seq_id()}) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok'). -spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). @@ -323,9 +320,6 @@ tx_commit(Q, PubMsgIds, AckSeqIds) tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). -set_delivered_and_advance(Q, MsgSeqId) -> - gen_server2:cast(?SERVER, {set_delivered_and_advance, Q, MsgSeqId}). - requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). @@ -547,16 +541,24 @@ handle_cast(report_memory, State) -> noreply1(State #dqstate { memory_report_timer = undefined }); handle_cast({prefetch, Q, From}, State) -> {ok, Result, State1} = internal_deliver(Q, true, true, false, State), - ok = rabbit_queue_prefetcher:publish(From, Result), - noreply(State1); -handle_cast({set_delivered_and_advance, Q, MsgSeqId}, State) -> - State2 = - case internal_deliver(Q, false, false, true, State) of - {ok, empty, State1} -> State1; - {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Rem}, State1} -> - State1 - end, - noreply(State2). + Cont = + try + ok = rabbit_queue_prefetcher:publish(From, Result), + true + catch exit:{noproc, _} -> + false + end, + State3 = + case Cont of + true -> + case internal_deliver(Q, false, false, true, State1) of + {ok, empty, State2} -> State2; + {ok, {_MsgId, _IsPersistent, _Delivered, _MsgSeqId, _Rem}, + State2} -> State2 + end; + false -> State1 + end, + noreply(State3). handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index 0265ba2bd2..bab5396e44 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -89,20 +89,22 @@ %% queue would have lost a msg that the mixed_queue would not pick %% up. %% -%% 3) The prefetcher hopefully receives the cast from -%% prefetcher:publish(Msg). It then adds to its internal queue and -%% calls disk_queue:set_delivered_and_advance(Q) which is a normal -%% priority cast. This cannot be low-priority because if it was, -%% the mixed_queue could come along, drain the prefetcher, thus +%% 3) The prefetcher hopefully receives the call from +%% prefetcher:publish(Msg). It replies immediately, and then adds +%% to its internal queue. A cast is not sufficient here because the +%% mixed_queue could come along, drain the prefetcher, thus %% catching the msg just sent by the disk_queue and then call %% disk_queue:deliver(Q) which is normal priority call, which could -%% overtake the low-priority -%% disk_queue:set_delivered_and_advance(Q) cast and thus result in -%% the same msg being delivered by the queue twice. +%% overtake a reply cast from the prefetcher to the disk queue, +%% which would result in the same message being delivered +%% twice. Thus when the disk_queue calls prefetcher:publish(Msg), +%% it is briefly blocked. However, a) the prefetcher replies +%% immediately, and b) the prefetcher should never have more than +%% one item in its mailbox anyway, so this should not cause a +%% problem to the disk_queue. %% -%% 4) The disk_queue receives the set_delivered_and_advance(Q) cast, -%% marks the msg at the head of the queue Q as delivered, and -%% advances the Q to the next msg. +%% 4) The disk_queue receives the reply, marks the msg at the head of +%% the queue Q as delivered, and advances the Q to the next msg. %% %% 5) If the prefetcher has not met its target then it goes back to %% 1). Otherwise it just sits and waits for the mixed_queue to @@ -123,45 +125,37 @@ %% on talk directly with the disk_queue and not via the %% prefetcher. This is more efficient and the mixed_queue will use %% normal priority blocking calls to the disk_queue and thus get -%% better service that way. When exiting in this way, two situations -%% could occur: +%% better service that way. %% -%% 1) The prefetcher has issued a disk_queue:prefetch(Q) which has not -%% yet been picked up by the disk_queue. This msg won't go away and -%% the disk_queue will eventually find it. However, when it does, -%% it'll simply read the next message from the queue (which could now -%% be empty), possibly populate the cache (no harm done) and try and -%% call prefetcher:publish(Msg) which will go no where. However, the -%% state of the queue and the state of the message has not been -%% altered so the mixed_queue will be able to fetch this message as if -%% it had never been prefetched. -%% -%% 2) The disk_queue has already picked up the disk_queue:prefetch(Q) -%% low priority message and has read the message and replied, by -%% calling prefetcher:publish(Msg). In fact, it's possible that -%% message is directly behind the call from mixed_queue to -%% prefetcher:drain(). Same reasoning as in 1) applies - neither the -%% queue's nor the message's state have been altered, so the -%% mixed_queue can absolutely go and fetch the message again. +%% The prefetcher may at this point have issued a +%% disk_queue:prefetch(Q) cast which has not yet been picked up by the +%% disk_queue. This msg won't go away and the disk_queue will +%% eventually find it. However, when it does, it'll simply read the +%% next message from the queue (which could now be empty), possibly +%% populate the cache (no harm done) and try and call +%% prefetcher:publish(Msg) which will result in an error, which the +%% disk_queue catches, as the publish call is to a non-existant +%% process. However, the state of the queue and the state of the +%% message has not been altered so the mixed_queue will be able to +%% fetch this message as if it had never been prefetched. %% %% The only point at which the queue is advanced and the message -%% marked as delivered is when the prefetcher calls -%% disk_queue:set_delivered_and_advance(Q). At this point the message -%% has been received by the prefetcher and so we guarantee it will be -%% passed to the mixed_queue when the mixed_queue tries to drain the -%% prefetcher. We must therefore ensure that this msg can't also be -%% delivered to the mixed_queue directly by the disk_queue through the -%% mixed_queue calling disk_queue:deliver(Q) which is why the -%% disk_queue:set_delivered_and_advance(Q) cast must be normal -%% priority (or at least match the priority of disk_queue:deliver(Q)). +%% marked as delivered is when the prefetcher replies to the publish +%% call. At this point the message has been received by the prefetcher +%% and so we guarantee it will be passed to the mixed_queue when the +%% mixed_queue tries to drain the prefetcher. We must therefore ensure +%% that this msg can't also be delivered to the mixed_queue directly +%% by the disk_queue through the mixed_queue calling +%% disk_queue:deliver(Q) which is why the prefetcher:publish function +%% is a call and not a cast, thus blocking the disk_queue. %% %% Finally, the prefetcher is only created when the mixed_queue is %% operating in mixed mode and it sees that the next N messages are -%% all on disk. During this phase, the mixed_queue can be asked to go -%% back to disk_only mode. When this happens, it calls -%% prefetcher:drain_and_stop() which behaves like two consecutive -%% calls to drain() - i.e. replies with all prefetched messages and -%% causes the prefetcher to exit. +%% all on disk, and the queue process is about to hibernate. During +%% this phase, the mixed_queue can be asked to go back to disk_only +%% mode. When this happens, it calls prefetcher:drain_and_stop() which +%% behaves like two consecutive calls to drain() - i.e. replies with +%% all prefetched messages and causes the prefetcher to exit. %% %% Note there is a flaw here in that we end up marking messages which %% have come through the prefetcher as delivered even if they don't @@ -172,25 +166,24 @@ %% we have no guarantee that the message will really go out of the %% socket. What we do still have is that messages which have the %% redelivered bit set false really are guaranteed to have not been -%% delivered already. Well, almost: if the disk_queue has a large back -%% log of messages then the prefetcher invocation of -%% disk_queue:set_delivered_and_advance(Q) may not be acted upon -%% before a crash. However, given that the prefetching is operating in -%% lock-step with the disk_queue, this means that at most, 1 (one) -%% message can fail to have its delivered flag raised. The alternative -%% is that disk_queue:set_delivered_and_advance(Q) could be made into -%% a call. However, if the disk_queue is heavily loaded, this can -%% block the prefetcher for some time, which in turn can block the -%% mixed_queue when it wants to drain the prefetcher. +%% delivered already. In theory, it's possible that the disk_queue +%% calls prefetcher:publish, blocks waiting for the reply. The +%% prefetcher grabs the message, is drained, the message goes out of +%% the socket and is delivered. The broker then crashes before the +%% disk_queue processes the reply from the prefetcher, thus the fact +%% the message has been delivered is not recorded. However, this can +%% only affect a single message at a time. I.e. there is a tiny chance +%% that the first message delivered on queue recovery that has the +%% redelivery bit set false, has in fact been delivered before. start_link(Queue, Count) -> gen_server2:start_link(?MODULE, [Queue, Count, self()], []). publish(Prefetcher, Obj = { #basic_message {}, _Size, _IsDelivered, _AckTag, _Remaining }) -> - gen_server2:cast(Prefetcher, {publish, Obj}); + gen_server2:call(Prefetcher, {publish, Obj}, infinity); publish(Prefetcher, empty) -> - gen_server2:cast(Prefetcher, publish_empty). + gen_server2:call(Prefetcher, publish_empty, infinity). drain(Prefetcher) -> gen_server2:call(Prefetcher, drain, infinity). @@ -213,6 +206,25 @@ init([Q, Count, QPid]) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +handle_call({publish, { Msg = #basic_message {}, + _Size, IsDelivered, AckTag, _Remaining }}, + DiskQueue, State = + #pstate { fetched_count = Fetched, target_count = Target, + msg_buf = MsgBuf, buf_length = Length, queue = Q + }) -> + gen_server2:reply(DiskQueue, ok), + ok = case Fetched + 1 == Target of + true -> ok; + false -> rabbit_disk_queue:prefetch(Q) + end, + MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), + {noreply, State #pstate { fetched_count = Fetched + 1, + buf_length = Length + 1, + msg_buf = MsgBuf1 }, hibernate}; +handle_call(publish_empty, _From, State) -> + %% Very odd. This could happen if the queue is deleted or purged + %% and the mixed queue fails to shut us down. + {reply, ok, State, hibernate}; handle_call(drain, _From, State = #pstate { buf_length = 0 }) -> {stop, normal, empty, State}; handle_call(drain, _From, State = #pstate { fetched_count = Count, @@ -230,24 +242,8 @@ handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, buf_length = Length }) -> {stop, normal, {MsgBuf, Length}, State}. -handle_cast(publish_empty, State) -> - %% Very odd. This could happen if the queue is deleted or purged - %% and the mixed queue fails to shut us down. - {noreply, State, hibernate}; -handle_cast({publish, { Msg = #basic_message {}, - _Size, IsDelivered, AckTag, _Remaining }}, - State = #pstate { fetched_count = Fetched, target_count = Target, - msg_buf = MsgBuf, buf_length = Length, queue = Q - }) -> - ok = rabbit_disk_queue:set_delivered_and_advance(Q, AckTag), - ok = case Fetched + 1 == Target of - true -> ok; - false -> rabbit_disk_queue:prefetch(Q) - end, - MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), - {noreply, State #pstate { fetched_count = Fetched + 1, - buf_length = Length + 1, - msg_buf = MsgBuf1 }, hibernate}. +handle_cast(Msg, State) -> + exit({unexpected_message_cast_to_prefetcher, Msg, State}). handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #pstate { queue_mref = MRef }) -> |
