summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-16 23:19:53 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-16 23:19:53 +0100
commit3a69615fae0fabd893f59b99f5c0d0cfbc3f56f7 (patch)
treed794c4bf70ce66775b1d25994f67eed1c57bc5f3
parentc6ca402ff9b4763b3d1c7758db6c94b8354beff1 (diff)
downloadrabbitmq-server-git-3a69615fae0fabd893f59b99f5c0d0cfbc3f56f7.tar.gz
A Matthias stipulated refactoring. Also, in testing found some issues with the prefetcher, found a couple of places where it being stopped wasn't being recorded properly but that wasn't the cause of the problem. Eventually found the cause to be the disk_queue attempting to publish to the prefetcher and getting back exit:{normal,_} instead of exit:{noproc,_}, which I didn't know could happen. To test:
1) create 100 queues 2) fire 100 msgs to each queue ((1 msg to every queue)*100) 3) pin all queues to disk 4) unpin all queues 5) do step 2 again 6) wait for the prefetchers to start up (watch CPU load and disk activity) 7) pin all queues to disk In step 7, the prefetchers are being stopped whilst the disk_queue is feeding them messages. I believe that exit:{noproc,_} comes back when sending a msg to a non-existant process, and exit:{normal,_} comes back if the process existed when we sent the message as part of the call but the process exited (normally) before our message was replied to.
-rw-r--r--src/rabbit_disk_queue.erl10
-rw-r--r--src/rabbit_mixed_queue.erl6
-rw-r--r--src/rabbit_queue_mode_manager.erl23
3 files changed, 22 insertions, 17 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 76399022c5..9cb233f8f9 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -545,8 +545,14 @@ handle_cast({prefetch, Q, From}, State) ->
try
ok = rabbit_queue_prefetcher:publish(From, Result),
true
- catch exit:{noproc, _} ->
- false
+ catch
+ exit:{noproc, _} ->
+ %% prefetcher was stopped *before* we sent message
+ false;
+ exit:{normal, _} ->
+ %% prefetcher was stopped *after* our message was
+ %% sent, but before it was processed
+ false
end,
State3 =
case Cont of
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index dddafaee0b..7bce3a0463 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -420,7 +420,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{ msg_buf = queue:join(Fetched, MsgBuf2),
prefetcher = case Status of
finished -> undefined;
- _ -> Prefetcher
+ continuing -> Prefetcher
end })
end
end.
@@ -595,7 +595,7 @@ purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
rabbit_disk_queue:purge(Q),
{Length,
State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
- memory_loss = Loss + QSize }}.
+ memory_loss = Loss + QSize, prefetcher = undefined }}.
delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
memory_loss = Loss, prefetcher = Prefetcher
@@ -606,7 +606,7 @@ delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
end,
ok = rabbit_disk_queue:delete_queue(Q),
{ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
- memory_loss = Loss + QSize }}.
+ memory_loss = Loss + QSize, prefetcher = undefined }}.
length(#mqstate { length = Length }) ->
Length.
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 37afdc6c82..1ab5e7a894 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -197,8 +197,9 @@ handle_call({pin_to_disk, Pid}, _From,
false ->
case find_queue(Pid, Mixed) of
{mixed, {OAlloc, _OActivity}} ->
- Mixed1 = send_to_disk(Callbacks, Mixed, Pid),
- {ok, State #state { mixed_queues = Mixed1,
+ ok = set_queue_mode(Callbacks, Pid, disk),
+ {ok, State #state { mixed_queues =
+ dict:erase(Pid, Mixed),
available_tokens = Avail + OAlloc,
disk_mode_pins =
sets:add_element(Pid, Pins)
@@ -257,8 +258,9 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
State #state { available_tokens = Avail1 }),
case Req > Avail2 of
true -> %% nowt we can do, send to disk
- Mixed2 = send_to_disk(Callbacks, Mixed1, Pid),
- {State1 #state { mixed_queues = Mixed2 }, disk};
+ ok = set_queue_mode(Callbacks, Pid, disk),
+ {State1 #state { mixed_queues =
+ dict:erase(Pid, Mixed1) }, disk};
false -> %% keep mixed
{State1 #state
{ mixed_queues =
@@ -280,10 +282,7 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
%% reason, so stay as disk
{State1, disk};
false -> %% can go to mixed mode
- {Module, Function, Args} =
- dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function,
- Args ++ [mixed]),
+ set_queue_mode(Callbacks, Pid, mixed),
{State1 #state {
mixed_queues =
dict:store(Pid, {Req, MixedActivity}, Mixed1),
@@ -348,10 +347,9 @@ find_queue(Pid, Mixed) ->
error -> disk
end.
-send_to_disk(Callbacks, Mixed, Pid) ->
+set_queue_mode(Callbacks, Pid, Mode) ->
{Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [disk]),
- dict:erase(Pid, Mixed).
+ erlang:apply(Module, Function, Args ++ [Mode]).
tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) ->
tidy_and_sum(lowrate, Mixed,
@@ -434,7 +432,8 @@ free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) ->
free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1,
AnaInit1, Req);
{value, CataInit1, Pid, Alloc} ->
- Mixed1 = send_to_disk(Callbacks, Mixed, Pid),
+ Mixed1 = dict:erase(Pid, Mixed),
+ ok = set_queue_mode(Callbacks, Pid, disk),
case Req > Alloc of
true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1,
CataInit1, AnaInit, Req - Alloc);