diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-16 23:19:53 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-16 23:19:53 +0100 |
| commit | 3a69615fae0fabd893f59b99f5c0d0cfbc3f56f7 (patch) | |
| tree | d794c4bf70ce66775b1d25994f67eed1c57bc5f3 | |
| parent | c6ca402ff9b4763b3d1c7758db6c94b8354beff1 (diff) | |
| download | rabbitmq-server-git-3a69615fae0fabd893f59b99f5c0d0cfbc3f56f7.tar.gz | |
A Matthias stipulated refactoring. Also, in testing found some issues with the prefetcher, found a couple of places where it being stopped wasn't being recorded properly but that wasn't the cause of the problem. Eventually found the cause to be the disk_queue attempting to publish to the prefetcher and getting back exit:{normal,_} instead of exit:{noproc,_}, which I didn't know could happen. To test:
1) create 100 queues
2) fire 100 msgs to each queue ((1 msg to every queue)*100)
3) pin all queues to disk
4) unpin all queues
5) do step 2 again
6) wait for the prefetchers to start up (watch CPU load and disk activity)
7) pin all queues to disk
In step 7, the prefetchers are being stopped whilst the disk_queue is feeding them messages. I believe that exit:{noproc,_} comes back when sending a msg to a non-existant process, and exit:{normal,_} comes back if the process existed when we sent the message as part of the call but the process exited (normally) before our message was replied to.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 23 |
3 files changed, 22 insertions, 17 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 76399022c5..9cb233f8f9 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -545,8 +545,14 @@ handle_cast({prefetch, Q, From}, State) -> try ok = rabbit_queue_prefetcher:publish(From, Result), true - catch exit:{noproc, _} -> - false + catch + exit:{noproc, _} -> + %% prefetcher was stopped *before* we sent message + false; + exit:{normal, _} -> + %% prefetcher was stopped *after* our message was + %% sent, but before it was processed + false end, State3 = case Cont of diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index dddafaee0b..7bce3a0463 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -420,7 +420,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q, { msg_buf = queue:join(Fetched, MsgBuf2), prefetcher = case Status of finished -> undefined; - _ -> Prefetcher + continuing -> Prefetcher end }) end end. @@ -595,7 +595,7 @@ purge(State = #mqstate { queue = Q, mode = mixed, length = Length, rabbit_disk_queue:purge(Q), {Length, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, - memory_loss = Loss + QSize }}. + memory_loss = Loss + QSize, prefetcher = undefined }}. delete_queue(State = #mqstate { queue = Q, memory_size = QSize, memory_loss = Loss, prefetcher = Prefetcher @@ -606,7 +606,7 @@ delete_queue(State = #mqstate { queue = Q, memory_size = QSize, end, ok = rabbit_disk_queue:delete_queue(Q), {ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(), - memory_loss = Loss + QSize }}. + memory_loss = Loss + QSize, prefetcher = undefined }}. length(#mqstate { length = Length }) -> Length. diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 37afdc6c82..1ab5e7a894 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -197,8 +197,9 @@ handle_call({pin_to_disk, Pid}, _From, false -> case find_queue(Pid, Mixed) of {mixed, {OAlloc, _OActivity}} -> - Mixed1 = send_to_disk(Callbacks, Mixed, Pid), - {ok, State #state { mixed_queues = Mixed1, + ok = set_queue_mode(Callbacks, Pid, disk), + {ok, State #state { mixed_queues = + dict:erase(Pid, Mixed), available_tokens = Avail + OAlloc, disk_mode_pins = sets:add_element(Pid, Pins) @@ -257,8 +258,9 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, State #state { available_tokens = Avail1 }), case Req > Avail2 of true -> %% nowt we can do, send to disk - Mixed2 = send_to_disk(Callbacks, Mixed1, Pid), - {State1 #state { mixed_queues = Mixed2 }, disk}; + ok = set_queue_mode(Callbacks, Pid, disk), + {State1 #state { mixed_queues = + dict:erase(Pid, Mixed1) }, disk}; false -> %% keep mixed {State1 #state { mixed_queues = @@ -280,10 +282,7 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, %% reason, so stay as disk {State1, disk}; false -> %% can go to mixed mode - {Module, Function, Args} = - dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, - Args ++ [mixed]), + set_queue_mode(Callbacks, Pid, mixed), {State1 #state { mixed_queues = dict:store(Pid, {Req, MixedActivity}, Mixed1), @@ -348,10 +347,9 @@ find_queue(Pid, Mixed) -> error -> disk end. -send_to_disk(Callbacks, Mixed, Pid) -> +set_queue_mode(Callbacks, Pid, Mode) -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [disk]), - dict:erase(Pid, Mixed). + erlang:apply(Module, Function, Args ++ [Mode]). tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) -> tidy_and_sum(lowrate, Mixed, @@ -434,7 +432,8 @@ free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1, AnaInit1, Req); {value, CataInit1, Pid, Alloc} -> - Mixed1 = send_to_disk(Callbacks, Mixed, Pid), + Mixed1 = dict:erase(Pid, Mixed), + ok = set_queue_mode(Callbacks, Pid, disk), case Req > Alloc of true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1, CataInit1, AnaInit, Req - Alloc); |
