summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-05 02:28:19 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-05 02:28:19 +0000
commit064d70e798030b60b4c6a7136ed8070998fb4dbe (patch)
tree5144711db524a51ed6e671bbb63dd8b7660b4421
parent1757140934da3ff745585025c5c0111fa6b432e3 (diff)
downloadrabbitmq-server-git-064d70e798030b60b4c6a7136ed8070998fb4dbe.tar.gz
nack messages when the first queue dies
-rw-r--r--src/rabbit_channel.erl37
1 files changed, 21 insertions, 16 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 526fb42881..e2437b8e55 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -298,12 +298,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
%% process_confirms to prevent each MsgSeqNo being removed from
%% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
+ {Nack, SendFun} = case Reason of
+ normal -> {false, fun record_confirms/2};
+ _ -> {true, fun send_nacks/2}
+ end,
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
erase_queue_stats(QPid),
- State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
- end)(MXs, State2),
+ State3 = SendFun(MXs, State2),
noreply(queue_blocked(QPid, State3)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
@@ -513,23 +514,25 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
+process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
{MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
- State);
- none -> Acc
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack, State);
+ none ->
+ Acc
end
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack,
+ State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
@@ -544,10 +547,12 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
UQM
end,
Qs1 = gb_sets:del_element(QPid, Qs),
- case gb_sets:is_empty(Qs1) of
- true ->
+ %% If QPid somehow died initiating a nack, clear the message from
+ %% internal data-structures. Also, cleanup empty entries.
+ Empty = gb_sets:is_empty(Qs1),
+ if (Empty orelse Nack) ->
{[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
- false ->
+ true ->
{MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.