diff options
| -rw-r--r-- | src/rabbit_channel.erl | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 831058db31..cccd09dd35 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -43,11 +43,13 @@ -record(tx, {msgs, acks}). %% (1) %% (1) acks looks s.t. like this: -%% [{true,[[6,7,8],[5]]},{ack,[[4],[1,2,3]]}, ...] +%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...] %% -%% Each element is a pair consisting of a tag and a list of lists of +%% Each element is a pair consisting of a tag and a list of %% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true' -%% (reject w requeue), 'false' (reject w/o requeue). +%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as +%% well as the list overall, are in "most-recent (generally youngest) +%% ack first" order. -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -813,7 +815,7 @@ handle_method(#'basic.recover_async'{requeue = true}, rabbit_misc:with_exit_handler( OkFun, fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) - end, UAMQL), + end, lists:reverse(UAMQL)), ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method @@ -1045,8 +1047,8 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs, limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), lists:foreach( - fun ({ack, A}) -> ack(append_reverse(A), State1); - ({Requeue, A}) -> reject(Requeue, append_reverse(A), Limiter) + fun ({ack, A}) -> ack(lists:reverse(A), State1); + ({Requeue, A}) -> reject(Requeue, lists:reverse(A), Limiter) end, lists:reverse(Acks)), {noreply, maybe_complete_tx(State1#ch{tx = committing})}; @@ -1055,7 +1057,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = #tx{acks = Acks}}) -> - AcksL = append_reverse([append_reverse(L) || {_, L} <- Acks]), + AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, tx = new_tx()}}; @@ -1219,6 +1221,7 @@ reject(DeliveryTag, Requeue, Multiple, State1#ch{tx = Tx#tx{acks = Acks1}} end}. +%% NB: Acked is in youngest-first order reject(Requeue, Acked, Limiter) -> foreach_per_queue( fun (QPid, MsgIds) -> @@ -1249,8 +1252,9 @@ record_sent(ConsumerTag, AckRequired, end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. +%% NB: returns acks in youngest-first order collect_acks(Q, 0, true) -> - {queue:to_list(Q), queue:new()}; + {lists:reverse(queue:to_list(Q)), queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> collect_acks([], [], Q, DeliveryTag, Multiple). @@ -1259,10 +1263,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {case ToAcc of - [] -> [UnackedMsg]; - _ -> lists:reverse([UnackedMsg | ToAcc]) - end, + {[UnackedMsg | ToAcc], case PrefixAcc of [] -> QTail; _ -> queue:join( @@ -1280,6 +1281,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. +%% NB: Acked is in youngest-first order ack(Acked, State = #ch{queue_names = QNames}) -> foreach_per_queue( fun (QPid, MsgIds) -> @@ -1306,10 +1308,12 @@ foreach_per_queue(_F, []) -> ok; foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case F(QPid, [MsgId]); +%% NB: UAL should be in youngest-first order; the tree values will +%% then be in oldest-first order foreach_per_queue(F, UAL) -> T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons(QPid, MsgId, T) - end, gb_trees:empty(), lists:reverse(UAL)), + end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, @@ -1450,10 +1454,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -append_reverse(L) -> lists:append(lists:reverse(L)). - -ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, [Acked | Acks]} | L]; -ack_cons(Tag, Acked, Acks) -> [{Tag, [Acked]} | Acks]. +ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L]; +ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks]. maybe_complete_tx(State = #ch{tx = #tx{}}) -> State; |
