diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-20 22:42:39 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-20 22:42:39 +0100 |
| commit | 79823b5509b0853b0760879186adaa68e158aea3 (patch) | |
| tree | a9e10df275e70d52c727a3307caddc010c137620 | |
| parent | 4345618a753e8667365e3eac3890ec81140336f5 (diff) | |
| download | rabbitmq-server-git-79823b5509b0853b0760879186adaa68e158aea3.tar.gz | |
Made messages be marked as delivered during prefetch *before* they are passed to the prefetcher.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 82 |
2 files changed, 43 insertions, 43 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 9e70f8c5a5..96125031dc 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -529,7 +529,7 @@ handle_cast({set_mode, Mode}, State) -> mixed -> fun to_ram_disk_mode/1 end)(State)); handle_cast({prefetch, Q, From}, State) -> - {ok, Result, State1} = internal_fetch(Q, true, true, false, State), + {ok, Result, State1} = internal_fetch(Q, true, false, false, State), Cont = rabbit_misc:with_exit_handler( fun () -> false end, fun () -> @@ -539,7 +539,7 @@ handle_cast({prefetch, Q, From}, State) -> State3 = case Cont of true -> - case internal_fetch(Q, false, false, true, State1) of + case internal_fetch(Q, false, true, true, State1) of {ok, empty, State2} -> State2; {ok, {_MsgId, _IsPersistent, _Delivered, _MsgSeqId, _Rem}, State2} -> State2 diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index ad6b1ce2d4..f22aa6afe3 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -82,29 +82,36 @@ %% priority cast. Note that in the mean time, the mixed_queue could %% have come along, found the prefetcher empty, asked it to %% exit. This means the effective "reply" from the disk_queue will -%% go no where. As a result, the disk_queue must perform no -%% modification to the status of the message *or the queue* - do -%% not mark the message delivered, and do not advance the queue. If -%% it did advance the queue and the msg was then lost, then the -%% queue would have lost a msg that the mixed_queue would not pick -%% up. +%% go no where. As a result, the disk_queue should not advance the +%% queue. However, it does mark the messages as delivered. The +%% reasoning is that if it didn't, there would be the possibility +%% that the message was delivered without it being marked as such +%% on disk. We must maintain the property that a message which is +%% marked as non-redelivered really hasn't been delivered anywhere +%% before. The downside is that should the prefetcher not receive +%% this message, the queue will then fetch the message from the +%% disk_queue directly, and this message will have its delivered +%% bit set. The queue will not be advanced though - if it did +%% advance the queue and the msg was then lost, then the queue +%% would have lost a msg that the mixed_queue would not pick up. %% %% 3) The prefetcher hopefully receives the call from %% prefetcher:publish(Msg). It replies immediately, and then adds -%% to its internal queue. A cast is not sufficient here because the -%% mixed_queue could come along, drain the prefetcher, thus -%% catching the msg just sent by the disk_queue and then call -%% disk_queue:fetch(Q) which is normal priority call, which could -%% overtake a reply cast from the prefetcher to the disk queue, -%% which would result in the same message being delivered +%% to its internal queue. A cast is not sufficient as a pseudo +%% "reply" here because the mixed_queue could come along, drain the +%% prefetcher, thus catching the msg just sent by the disk_queue +%% and then call disk_queue:fetch(Q) which is normal priority call, +%% which could overtake a reply cast from the prefetcher to the +%% disk queue, resulting in the same message being delivered %% twice. Thus when the disk_queue calls prefetcher:publish(Msg), %% it is briefly blocked. However, a) the prefetcher replies %% immediately, and b) the prefetcher should never have more than -%% one item in its mailbox anyway, so this should not cause a -%% problem to the disk_queue. +%% two items in its mailbox anyway (one from the queue process / +%% mixed_queue and one from the disk_queue), so this should not +%% cause a problem to the disk_queue. %% -%% 4) The disk_queue receives the reply, marks the msg at the head of -%% the queue Q as delivered, and advances the Q to the next msg. +%% 4) The disk_queue receives the reply, and advances the Q to the +%% next msg. %% %% 5) If the prefetcher has not met its target then it goes back to %% 1). Otherwise it just sits and waits for the mixed_queue to @@ -125,29 +132,30 @@ %% on talk directly with the disk_queue and not via the %% prefetcher. This is more efficient and the mixed_queue will use %% normal priority blocking calls to the disk_queue and thus get -%% better service that way. +%% better service. %% %% The prefetcher may at this point have issued a %% disk_queue:prefetch(Q) cast which has not yet been picked up by the %% disk_queue. This msg won't go away and the disk_queue will %% eventually find it. However, when it does, it'll simply read the %% next message from the queue (which could now be empty), possibly -%% populate the cache (no harm done) and try and call -%% prefetcher:publish(Msg) which will result in an error, which the -%% disk_queue catches, as the publish call is to a non-existant -%% process. However, the state of the queue and the state of the -%% message has not been altered so the mixed_queue will be able to -%% fetch this message as if it had never been prefetched. +%% populate the cache (no harm done), mark the message as deleted (oh +%% well, not a spec violation, and better than the alternative) and +%% try and call prefetcher:publish(Msg) which will result in an error, +%% which the disk_queue catches, as the publish call is to a +%% non-existant process. However, the state of the queue has not been +%% altered so the mixed_queue will be able to fetch this message as if +%% it had never been prefetched. %% -%% The only point at which the queue is advanced and the message -%% marked as delivered is when the prefetcher replies to the publish -%% call. At this point the message has been received by the prefetcher -%% and so we guarantee it will be passed to the mixed_queue when the -%% mixed_queue tries to drain the prefetcher. We must therefore ensure -%% that this msg can't also be delivered to the mixed_queue directly -%% by the disk_queue through the mixed_queue calling -%% disk_queue:fetch(Q) which is why the prefetcher:publish function -%% is a call and not a cast, thus blocking the disk_queue. +%% The only point at which the queue is advanced is when the +%% prefetcher replies to the publish call. At this point the message +%% has been received by the prefetcher and so we guarantee it will be +%% passed to the mixed_queue when the mixed_queue tries to drain the +%% prefetcher. We must therefore ensure that this msg can't also be +%% delivered to the mixed_queue directly by the disk_queue through the +%% mixed_queue calling disk_queue:fetch(Q) which is why the +%% prefetcher:publish function is a call and not a cast, thus blocking +%% the disk_queue. %% %% Finally, the prefetcher is only created when the mixed_queue is %% operating in mixed mode and it sees that the next N messages are @@ -166,15 +174,7 @@ %% we have no guarantee that the message will really go out of the %% socket. What we do still have is that messages which have the %% redelivered bit set false really are guaranteed to have not been -%% delivered already. In theory, it's possible that the disk_queue -%% calls prefetcher:publish, blocks waiting for the reply. The -%% prefetcher grabs the message, is drained, the message goes out of -%% the socket and is delivered. The broker then crashes before the -%% disk_queue processes the reply from the prefetcher, thus the fact -%% the message has been delivered is not recorded. However, this can -%% only affect a single message at a time. I.e. there is a tiny chance -%% that the first message delivered on queue recovery that has the -%% redelivery bit set false, has in fact been delivered before. +%% delivered already. start_link(Queue, Count) -> gen_server2:start_link(?MODULE, [Queue, Count, self()], []). |
