summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl36
1 files changed, 19 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 831058db31..cccd09dd35 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -43,11 +43,13 @@
-record(tx, {msgs, acks}). %% (1)
%% (1) acks looks s.t. like this:
-%% [{true,[[6,7,8],[5]]},{ack,[[4],[1,2,3]]}, ...]
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
%%
-%% Each element is a pair consisting of a tag and a list of lists of
+%% Each element is a pair consisting of a tag and a list of
%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
-%% (reject w requeue), 'false' (reject w/o requeue).
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -813,7 +815,7 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_misc:with_exit_handler(
OkFun,
fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
- end, UAMQL),
+ end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1045,8 +1047,8 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
lists:foreach(
- fun ({ack, A}) -> ack(append_reverse(A), State1);
- ({Requeue, A}) -> reject(Requeue, append_reverse(A), Limiter)
+ fun ({ack, A}) -> ack(lists:reverse(A), State1);
+ ({Requeue, A}) -> reject(Requeue, lists:reverse(A), Limiter)
end, lists:reverse(Acks)),
{noreply, maybe_complete_tx(State1#ch{tx = committing})};
@@ -1055,7 +1057,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = #tx{acks = Acks}}) ->
- AcksL = append_reverse([append_reverse(L) || {_, L} <- Acks]),
+ AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
@@ -1219,6 +1221,7 @@ reject(DeliveryTag, Requeue, Multiple,
State1#ch{tx = Tx#tx{acks = Acks1}}
end}.
+%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
@@ -1249,8 +1252,9 @@ record_sent(ConsumerTag, AckRequired,
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
+%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {queue:to_list(Q), queue:new()};
+ {lists:reverse(queue:to_list(Q)), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
@@ -1259,10 +1263,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {case ToAcc of
- [] -> [UnackedMsg];
- _ -> lists:reverse([UnackedMsg | ToAcc])
- end,
+ {[UnackedMsg | ToAcc],
case PrefixAcc of
[] -> QTail;
_ -> queue:join(
@@ -1280,6 +1281,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
+%% NB: Acked is in youngest-first order
ack(Acked, State = #ch{queue_names = QNames}) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
@@ -1306,10 +1308,12 @@ foreach_per_queue(_F, []) ->
ok;
foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
F(QPid, [MsgId]);
+%% NB: UAL should be in youngest-first order; the tree values will
+%% then be in oldest-first order
foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), lists:reverse(UAL)),
+ end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
@@ -1450,10 +1454,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-append_reverse(L) -> lists:append(lists:reverse(L)).
-
-ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, [Acked | Acks]} | L];
-ack_cons(Tag, Acked, Acks) -> [{Tag, [Acked]} | Acks].
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
maybe_complete_tx(State = #ch{tx = #tx{}}) ->
State;