summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl40
5 files changed, 44 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7d6e41f2f7..166a9576aa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -727,12 +727,10 @@ ensure_ttl_timer(State) ->
State.
mk_dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- fun(_MsgLookupFun, _AckTag, BQS) -> BQS end;
+ undefined;
mk_dead_letter_fun(Reason, _State) ->
- fun(MsgLookupFun, AckTag, BQS) ->
- {Msg, BQS1} = MsgLookupFun(BQS),
- gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}),
- BQS1
+ fun(Msg, AckTag) ->
+ gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
end.
dead_letter_msg(Msg, AckTag, Reason,
@@ -782,8 +780,8 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
error -> State
end.
-handle_queue_down(QPid, State = #q{queue_monitors = QMons,
- unconfirmed_qm = UQM}) ->
+handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
+ unconfirmed_qm = UQM}) ->
case dict:find(QPid, QMons) of
error ->
noreply(State);
@@ -794,8 +792,12 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons,
none ->
noreply(State);
{value, MsgSeqNosSet} ->
- rabbit_log:warning("Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]),
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> rabbit_log:warning(
+ "Dead queue lost ~p messages~n",
+ [gb_sets:size(MsgSeqNosSet)]);
+ false -> ok
+ end,
handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid,
State#q{queue_monitors =
dict:erase(QPid, QMons)})
@@ -1362,9 +1364,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
{stop, normal, State};
-handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
+handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> handle_queue_down(DownPid, State1);
+ {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
{stop, State1} -> {stop, normal, State1}
end;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3098b62101..f2bf3481e0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1159,14 +1159,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
%% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} =
- case Reason of
- Reason when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown ->
- {false, fun record_confirms/2};
- {shutdown, _} ->
- {false, fun record_confirms/2};
- _ ->
- {true, fun send_nacks/2}
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {true, fun send_nacks/2};
+ false -> {false, fun record_confirms/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
SendFun(MXs, State2).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index f8c8d48233..226470a53c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -30,6 +30,7 @@
-export([start_cover/1]).
-export([confirm_to_sender/2]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
+-export([is_abnormal_termination/1]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
@@ -132,6 +133,7 @@
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(is_abnormal_termination/1 :: (any()) -> boolean()).
-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
@@ -402,6 +404,17 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
+is_abnormal_termination(Reason) ->
+ case Reason of
+ Reason when Reason =:= noproc; Reason =:= noconnection;
+ Reason =:= normal; Reason =:= shutdown ->
+ false;
+ {shutdown, _} ->
+ false;
+ _ ->
+ true
+ end.
+
with_user(Username, Thunk) ->
fun () ->
case mnesia:read({rabbit_user, Username}) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ba0fffd65c..434366485a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2369,7 +2369,7 @@ test_dropwhile(VQ0) ->
VQ4.
-dummy_msg_fun() -> fun(_Fun, _Extra, State) -> State end.
+dummy_msg_fun() -> fun(_Msg, _SeqId) -> ok end.
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 95b47d8343..c646a3dff0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -593,8 +593,9 @@ dropwhile(Pred, MsgFun, State) ->
{true, _} ->
{{_, _, AckTag, _}, State2} =
internal_fetch(true, MsgStatus, State1),
- dropwhile(Pred, MsgFun, MsgFun(read_msg_callback(MsgStatus),
- AckTag, State2));
+ {MsgStatus, State3} = read_msg(MsgStatus, State2),
+ MsgFun(MsgStatus#msg_status.msg, AckTag),
+ dropwhile(Pred, MsgFun, State3);
{false, _} ->
a(in_r(MsgStatus, State1))
end
@@ -612,17 +613,6 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
-read_msg_callback(#msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent }) ->
- fun(State) -> read_msg_common(MsgId, IsPersistent, State) end;
-
-read_msg_callback(#msg_status{ msg = Msg }) ->
- fun(State) -> {Msg, State} end;
-
-read_msg_callback({IsPersistent, MsgId, _MsgProps}) ->
- fun(State) -> read_msg_common(MsgId, IsPersistent, State) end.
-
ack([], _Fun, State) ->
{[], State};
@@ -650,8 +640,10 @@ ack(AckTags, undefined, State) ->
ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
{[], lists:foldl(
fun(SeqId, State1) ->
- AckEntry = gb_trees:get(SeqId, PA),
- MsgFun(read_msg_callback(AckEntry), SeqId, State1)
+ {MsgStatus, State2} =
+ read_msg(gb_trees:get(SeqId, PA), State1),
+ MsgFun(MsgStatus#msg_status.msg, SeqId),
+ State2
end, State, AckTags)}.
requeue(AckTags, #vqstate { delta = Delta,
@@ -1062,19 +1054,15 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },
- State) ->
- {Msg, State1} = read_msg_common(MsgId, IsPersistent, State),
- {MsgStatus #msg_status { msg = Msg }, State1};
-read_msg(MsgStatus, State) ->
- {MsgStatus, State}.
-
-read_msg_common(MsgId, IsPersistent,
- State = #vqstate{ ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState }) ->
+ State = #vqstate{ ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState }) ->
{{ok, Msg = #basic_message{}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate { ram_msg_count = RamMsgCount + 1,
- msg_store_clients = MSCState1 }}.
+ {MsgStatus #msg_status { msg = Msg },
+ State #vqstate { ram_msg_count = RamMsgCount + 1,
+ msg_store_clients = MSCState1 }};
+read_msg(MsgStatus, State) ->
+ {MsgStatus, State}.
internal_fetch(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,