summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl112
-rw-r--r--src/rabbit_backing_queue.erl26
-rw-r--r--src/rabbit_mirror_queue_master.erl15
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl29
5 files changed, 101 insertions, 89 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ce3b4ce88b..6b8f8c611b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,25 +716,46 @@ drop_expired_msgs(State = #q{dlx = DLX,
backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLX of
- undefined -> BQ:dropwhile(ExpirePred, BQS);
- _ -> {Next, Msgs, BQS2} =
- BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/4,
- [], BQS),
- case Msgs of
- [] -> ok;
- _ -> (dead_letter_fun(expired))(
- lists:reverse(Msgs))
- end,
- {Next, BQS2}
- end,
+ {Props, State1} =
+ case DLX of
+ undefined -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}};
+ _ -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} ->
+ drop_expired_messages(ExpirePred, X, State);
+ {error, not_found} ->
+ {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}}
+ end
+ end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
- end, State#q{backing_queue_state = BQS1}).
-
-accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+ end, State1).
+
+drop_expired_messages(ExpirePred, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {Next, {ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ BQ:fetchwhile(
+ ExpirePred,
+ fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, expired, X, RK, SeqNo, QName) of
+ [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons};
+ QPids -> {ConfirmImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS),
+ {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1),
+ {Next, State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}}.
ensure_ttl_timer(undefined, State) ->
State;
@@ -756,10 +777,12 @@ ensure_ttl_timer(_Expiry, State) ->
State.
dead_letter_fun(Reason) ->
- fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
+ fun(Msg, AckTag) ->
+ gen_server2:cast(self(), {dead_letter, Msg, AckTag, Reason})
+ end.
-dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = make_dead_letter_msg(Reason, Msg, State),
+dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
@@ -838,19 +861,16 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
end
end.
-make_dead_letter_msg(Reason,
- Msg = #basic_message{content = Content,
+make_dead_letter_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
+ Reason, DLX, RK, #resource{name = QName}) ->
{DeathRoutingKeys, HeadersFun1} =
- case DlxRoutingKey of
+ case RK of
undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[DlxRoutingKey],
- fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- #resource{name = QName} = qname(State),
TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
@@ -1202,8 +1222,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ {ok, BQS1} = BQ:ackfold(
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1251,29 +1272,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
+handle_cast({dead_letter, Msg, AckTag, Reason},
+ State = #q{dlx = XName,
+ dlx_routing_key = RK,
+ publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- {AckImmediately, State2} =
- lists:foldl(
- fun({Msg, AckTag},
- {Acks, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}}) ->
- case dead_letter_publish(Msg, Reason, X, State1) of
- [] -> {[AckTag | Acks], State1};
- QPids -> UC1 = dtree:insert(
- SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- {Acks,
- State1#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}}
- end
- end, {[], State}, Msgs),
- cleanup_after_confirm(AckImmediately, State2);
+ case dead_letter_publish(Msg, Reason, X, RK, SeqNo, qname(State)) of
+ [] -> cleanup_after_confirm([AckTag], State);
+ QPids -> UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ State#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}
+ end;
{error, not_found} ->
- cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
+ cleanup_after_confirm([AckTag], State)
end;
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 272df5c1b7..99b5946e59 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,8 +35,7 @@
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -133,14 +132,11 @@
-> {rabbit_types:message_properties() | undefined, state()}.
%% Like dropwhile, except messages are fetched in "require
-%% acknowledgement" mode and are passed, together with their Delivered
-%% flag and ack tag, to the supplied function. The function is also
-%% fed an accumulator. The result of fetchwhile is as for dropwhile
-%% plus the accumulator.
--callback fetchwhile(msg_pred(),
- fun ((rabbit_types:basic_message(), boolean(), ack(), A)
- -> A),
- A, state())
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
-> {rabbit_types:message_properties() | undefined,
A, state()}.
@@ -156,14 +152,14 @@
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
@@ -233,7 +229,7 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e3d967bc53..e857f39526 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,11 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2,
- requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -281,10 +281,6 @@ ack(AckTags, State = #state { gm = GM,
end,
{MsgIds, State #state { backing_queue_state = BQS1 }}.
-foreach_ack(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
-
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -292,6 +288,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
fold(Fun, Acc, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fold(Fun, Acc, BQS),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b499c59b30..09ed3d0890 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2434,7 +2434,7 @@ test_dropfetchwhile(VQ0) ->
{#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
rabbit_variable_queue:fetchwhile(
fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
- fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) ->
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
{[Msg | MsgAcc], [AckTag | AckAcc]}
end, {[], []}, VQ1),
true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
@@ -2473,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) ->
fun (VQ1) ->
{_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
fun (_) -> false end,
- fun (_, _, _, A) -> A end,
+ fun (_, _, A) -> A end,
ok, VQ1),
VQ2
end, VQ0).
@@ -2608,8 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
- VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3e4c7c864f..05468a6ee8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,10 +19,10 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
- fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -597,10 +597,9 @@ fetchwhile(Pred, Fun, Acc, State) ->
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, IsDelivered, AckTag}, State3} =
+ {{Msg, _IsDelivered, AckTag}, State3} =
internal_fetch(true, MsgStatus1, State2),
- Acc1 = Fun(Msg, IsDelivered, AckTag, Acc),
- fetchwhile(Pred, Fun, Acc1, State3);
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -650,16 +649,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-foreach_ack(undefined, State, _AckTags) ->
- State;
-foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -681,6 +670,16 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(
+ fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
+ {#msg_status { msg = Msg }, State1} =
+ read_msg(gb_trees:get(SeqId, PA), false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
fold(Fun, Acc, #vqstate { q1 = Q1,
q2 = Q2,
delta = #delta { start_seq_id = DeltaSeqId,