diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-03-05 02:28:19 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-03-05 02:28:19 +0000 |
| commit | 064d70e798030b60b4c6a7136ed8070998fb4dbe (patch) | |
| tree | 5144711db524a51ed6e671bbb63dd8b7660b4421 | |
| parent | 1757140934da3ff745585025c5c0111fa6b432e3 (diff) | |
| download | rabbitmq-server-git-064d70e798030b60b4c6a7136ed8070998fb4dbe.tar.gz | |
nack messages when the first queue dies
| -rw-r--r-- | src/rabbit_channel.erl | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 526fb42881..e2437b8e55 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -298,12 +298,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, %% process_confirms to prevent each MsgSeqNo being removed from %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), + {Nack, SendFun} = case Reason of + normal -> {false, fun record_confirms/2}; + _ -> {true, fun send_nacks/2} + end, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), erase_queue_stats(QPid), - State3 = (case Reason of - normal -> fun record_confirms/2; - _ -> fun send_nacks/2 - end)(MXs, State2), + State3 = SendFun(MXs, State2), noreply(queue_blocked(QPid, State3)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> @@ -513,23 +514,25 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), record_confirms(MXs, State1). -process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}) -> +process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, - State); - none -> Acc + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, + Acc, Nack, State); + none -> + Acc end end, {[], UMQ, UQM}, MsgSeqNos), {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, + State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), @@ -544,10 +547,12 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> UQM end, Qs1 = gb_sets:del_element(QPid, Qs), - case gb_sets:is_empty(Qs1) of - true -> + %% If QPid somehow died initiating a nack, clear the message from + %% internal data-structures. Also, cleanup empty entries. + Empty = gb_sets:is_empty(Qs1), + if (Empty orelse Nack) -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; - false -> + true -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} end. |
