diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 40 |
5 files changed, 44 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7d6e41f2f7..166a9576aa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -727,12 +727,10 @@ ensure_ttl_timer(State) -> State. mk_dead_letter_fun(_Reason, #q{dlx = undefined}) -> - fun(_MsgLookupFun, _AckTag, BQS) -> BQS end; + undefined; mk_dead_letter_fun(Reason, _State) -> - fun(MsgLookupFun, AckTag, BQS) -> - {Msg, BQS1} = MsgLookupFun(BQS), - gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}), - BQS1 + fun(Msg, AckTag) -> + gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. dead_letter_msg(Msg, AckTag, Reason, @@ -782,8 +780,8 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> error -> State end. -handle_queue_down(QPid, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> +handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, + unconfirmed_qm = UQM}) -> case dict:find(QPid, QMons) of error -> noreply(State); @@ -794,8 +792,12 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons, none -> noreply(State); {value, MsgSeqNosSet} -> - rabbit_log:warning("Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]), + case rabbit_misc:is_abnormal_termination(Reason) of + true -> rabbit_log:warning( + "Dead queue lost ~p messages~n", + [gb_sets:size(MsgSeqNosSet)]); + false -> ok + end, handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State#q{queue_monitors = dict:erase(QPid, QMons)}) @@ -1362,9 +1364,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% monitor-and-async- delete in case the connection goes away %% unexpectedly. {stop, normal, State}; -handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> +handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, State1} -> handle_queue_down(DownPid, State1); + {ok, State1} -> handle_queue_down(DownPid, Reason, State1); {stop, State1} -> {stop, normal, State1} end; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3098b62101..f2bf3481e0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1159,14 +1159,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = - case Reason of - Reason when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> - {false, fun record_confirms/2}; - {shutdown, _} -> - {false, fun record_confirms/2}; - _ -> - {true, fun send_nacks/2} + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {true, fun send_nacks/2}; + false -> {false, fun record_confirms/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), SendFun(MXs, State2). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index f8c8d48233..226470a53c 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -30,6 +30,7 @@ -export([start_cover/1]). -export([confirm_to_sender/2]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). +-export([is_abnormal_termination/1]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([execute_mnesia_transaction/2]). @@ -132,6 +133,7 @@ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(is_abnormal_termination/1 :: (any()) -> boolean()). -spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) @@ -402,6 +404,17 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). +is_abnormal_termination(Reason) -> + case Reason of + Reason when Reason =:= noproc; Reason =:= noconnection; + Reason =:= normal; Reason =:= shutdown -> + false; + {shutdown, _} -> + false; + _ -> + true + end. + with_user(Username, Thunk) -> fun () -> case mnesia:read({rabbit_user, Username}) of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ba0fffd65c..434366485a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2369,7 +2369,7 @@ test_dropwhile(VQ0) -> VQ4. -dummy_msg_fun() -> fun(_Fun, _Extra, State) -> State end. +dummy_msg_fun() -> fun(_Msg, _SeqId) -> ok end. test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 95b47d8343..c646a3dff0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -593,8 +593,9 @@ dropwhile(Pred, MsgFun, State) -> {true, _} -> {{_, _, AckTag, _}, State2} = internal_fetch(true, MsgStatus, State1), - dropwhile(Pred, MsgFun, MsgFun(read_msg_callback(MsgStatus), - AckTag, State2)); + {MsgStatus, State3} = read_msg(MsgStatus, State2), + MsgFun(MsgStatus#msg_status.msg, AckTag), + dropwhile(Pred, MsgFun, State3); {false, _} -> a(in_r(MsgStatus, State1)) end @@ -612,17 +613,6 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. -read_msg_callback(#msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }) -> - fun(State) -> read_msg_common(MsgId, IsPersistent, State) end; - -read_msg_callback(#msg_status{ msg = Msg }) -> - fun(State) -> {Msg, State} end; - -read_msg_callback({IsPersistent, MsgId, _MsgProps}) -> - fun(State) -> read_msg_common(MsgId, IsPersistent, State) end. - ack([], _Fun, State) -> {[], State}; @@ -650,8 +640,10 @@ ack(AckTags, undefined, State) -> ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> {[], lists:foldl( fun(SeqId, State1) -> - AckEntry = gb_trees:get(SeqId, PA), - MsgFun(read_msg_callback(AckEntry), SeqId, State1) + {MsgStatus, State2} = + read_msg(gb_trees:get(SeqId, PA), State1), + MsgFun(MsgStatus#msg_status.msg, SeqId), + State2 end, State, AckTags)}. requeue(AckTags, #vqstate { delta = Delta, @@ -1062,19 +1054,15 @@ queue_out(State = #vqstate { q4 = Q4 }) -> read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, - State) -> - {Msg, State1} = read_msg_common(MsgId, IsPersistent, State), - {MsgStatus #msg_status { msg = Msg }, State1}; -read_msg(MsgStatus, State) -> - {MsgStatus, State}. - -read_msg_common(MsgId, IsPersistent, - State = #vqstate{ ram_msg_count = RamMsgCount, - msg_store_clients = MSCState }) -> + State = #vqstate{ ram_msg_count = RamMsgCount, + msg_store_clients = MSCState }) -> {{ok, Msg = #basic_message{}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate { ram_msg_count = RamMsgCount + 1, - msg_store_clients = MSCState1 }}. + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. internal_fetch(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, |
