diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2022-03-20 16:29:17 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-03-20 16:29:17 +0400 |
| commit | bb541e57caa7044ec877482c49eff55b6ac02a73 (patch) | |
| tree | 506845b01b583468af5006cb3368d721383cc865 | |
| parent | 5cb81796cbd521df4e0336bfc6c1595d4d7b04bf (diff) | |
| parent | a43d9890d51537afde003e66f7e808e3a03fb298 (diff) | |
| download | rabbitmq-server-git-bb541e57caa7044ec877482c49eff55b6ac02a73.tar.gz | |
Merge pull request #4307 from rabbitmq/mk-rabbit-channel-naming-borrowed-from-rabbitmq-server-4004-and-4305
Naming from #4305
| -rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 71 |
1 files changed, 42 insertions, 29 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 7fbaf2e499..199f226a21 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -120,11 +120,18 @@ writer_gc_threshold }). --record(pending_ack, {delivery_tag, +-record(pending_ack, { + %% delivery identifier used by clients + %% to acknowledge and reject deliveries + delivery_tag, + %% consumer tag tag, delivered_at, - queue, %% queue name - msg_id}). + %% queue name + queue, + %% message ID used by queue and message store implementations + msg_id + }). -record(ch, {cfg :: #conf{}, %% limiter state, see rabbit_limiter @@ -1342,7 +1349,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, case Tx of - none -> {State2, Actions} = ack(Acked, State1), + none -> {State2, Actions} = settle_acks(Acked, State1), handle_queue_actions(Actions, State2); {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks), State1#ch{tx = {Msgs, Acks1}} @@ -1722,7 +1729,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks}, Rev = fun (X) -> lists:reverse(lists:sort(X)) end, {State2, Actions2} = lists:foldl(fun ({ack, A}, {Acc, Actions}) -> - {Acc0, Actions0} = ack(Rev(A), Acc), + {Acc0, Actions0} = settle_acks(Rev(A), Acc), {Acc0, Actions ++ Actions0}; ({Requeue, A}, {Acc, Actions}) -> {Acc0, Actions0} = internal_reject(Requeue, Rev(A), Limiter, Acc), @@ -2029,37 +2036,43 @@ record_sent(Type, QueueType, Tag, AckRequired, end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. -%% NB: returns acks in youngest-first order -collect_acks(Q, 0, true) -> - {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()}; -collect_acks(Q, DeliveryTag, Multiple) -> - collect_acks([], [], Q, DeliveryTag, Multiple). - -collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> - case ?QUEUE:out(Q) of - {{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDeliveryTag}}, - QTail} -> - if CurrentDeliveryTag == DeliveryTag -> - {[UnackedMsg | ToAcc], - case PrefixAcc of - [] -> QTail; +%% Records a client-sent acknowledgement. Handles both single delivery acks +%% and multi-acks. +%% +%% Returns a triple of acknowledged pending acks, remaining pending acks, +%% and outdated pending acks (if any). +%% Sorts each group in the youngest-first order (ascending by delivery tag). +collect_acks(UAMQ, 0, true) -> + {lists:reverse(?QUEUE:to_list(UAMQ)), ?QUEUE:new()}; +collect_acks(UAMQ, DeliveryTag, Multiple) -> + collect_acks([], [], UAMQ, DeliveryTag, Multiple). + +collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) -> + case ?QUEUE:out(UAMQ) of + {{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDT}}, + UAMQTail} -> + if CurrentDT == DeliveryTag -> + {[UnackedMsg | AcknowledgedAcc], + case RemainingAcc of + [] -> UAMQTail; _ -> ?QUEUE:join( - ?QUEUE:from_list(lists:reverse(PrefixAcc)), - QTail) + ?QUEUE:from_list(lists:reverse(RemainingAcc)), + UAMQTail) end}; Multiple -> - collect_acks([UnackedMsg | ToAcc], PrefixAcc, - QTail, DeliveryTag, Multiple); + collect_acks([UnackedMsg | AcknowledgedAcc], RemainingAcc, + UAMQTail, DeliveryTag, Multiple); true -> - collect_acks(ToAcc, [UnackedMsg | PrefixAcc], - QTail, DeliveryTag, Multiple) + collect_acks(AcknowledgedAcc, [UnackedMsg | RemainingAcc], + UAMQTail, DeliveryTag, Multiple) end; {empty, _} -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -%% NB: Acked is in youngest-first order -ack(Acked, State = #ch{queue_states = QueueStates0}) -> +%% Settles (acknowledges) messages at the queue replica process level. +%% This happens in the youngest-first order (ascending by delivery tag). +settle_acks(Acks, State = #ch{queue_states = QueueStates0}) -> {QueueStates, Actions} = foreach_per_queue( fun ({QRef, CTag}, MsgIds, {Acc0, ActionsAcc0}) -> @@ -2071,8 +2084,8 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) -> {protocol_error, ErrorType, Reason, ReasonArgs} -> rabbit_misc:protocol_error(ErrorType, Reason, ReasonArgs) end - end, Acked, {QueueStates0, []}), - ok = notify_limiter(State#ch.limiter, Acked), + end, Acks, {QueueStates0, []}), + ok = notify_limiter(State#ch.limiter, Acks), {State#ch{queue_states = QueueStates}, Actions}. incr_queue_stats(QName, MsgIds, State = #ch{queue_states = QueueStates}) -> |
