summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2022-03-20 16:29:17 +0400
committerGitHub <noreply@github.com>2022-03-20 16:29:17 +0400
commitbb541e57caa7044ec877482c49eff55b6ac02a73 (patch)
tree506845b01b583468af5006cb3368d721383cc865
parent5cb81796cbd521df4e0336bfc6c1595d4d7b04bf (diff)
parenta43d9890d51537afde003e66f7e808e3a03fb298 (diff)
downloadrabbitmq-server-git-bb541e57caa7044ec877482c49eff55b6ac02a73.tar.gz
Merge pull request #4307 from rabbitmq/mk-rabbit-channel-naming-borrowed-from-rabbitmq-server-4004-and-4305
Naming from #4305
-rw-r--r--deps/rabbit/src/rabbit_channel.erl71
1 files changed, 42 insertions, 29 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 7fbaf2e499..199f226a21 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -120,11 +120,18 @@
writer_gc_threshold
}).
--record(pending_ack, {delivery_tag,
+-record(pending_ack, {
+ %% delivery identifier used by clients
+ %% to acknowledge and reject deliveries
+ delivery_tag,
+ %% consumer tag
tag,
delivered_at,
- queue, %% queue name
- msg_id}).
+ %% queue name
+ queue,
+ %% message ID used by queue and message store implementations
+ msg_id
+ }).
-record(ch, {cfg :: #conf{},
%% limiter state, see rabbit_limiter
@@ -1342,7 +1349,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply, case Tx of
- none -> {State2, Actions} = ack(Acked, State1),
+ none -> {State2, Actions} = settle_acks(Acked, State1),
handle_queue_actions(Actions, State2);
{Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
State1#ch{tx = {Msgs, Acks1}}
@@ -1722,7 +1729,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
{State2, Actions2} =
lists:foldl(fun ({ack, A}, {Acc, Actions}) ->
- {Acc0, Actions0} = ack(Rev(A), Acc),
+ {Acc0, Actions0} = settle_acks(Rev(A), Acc),
{Acc0, Actions ++ Actions0};
({Requeue, A}, {Acc, Actions}) ->
{Acc0, Actions0} = internal_reject(Requeue, Rev(A), Limiter, Acc),
@@ -2029,37 +2036,43 @@ record_sent(Type, QueueType, Tag, AckRequired,
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
-%% NB: returns acks in youngest-first order
-collect_acks(Q, 0, true) ->
- {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()};
-collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks([], [], Q, DeliveryTag, Multiple).
-
-collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
- case ?QUEUE:out(Q) of
- {{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDeliveryTag}},
- QTail} ->
- if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc],
- case PrefixAcc of
- [] -> QTail;
+%% Records a client-sent acknowledgement. Handles both single delivery acks
+%% and multi-acks.
+%%
+%% Returns a triple of acknowledged pending acks, remaining pending acks,
+%% and outdated pending acks (if any).
+%% Sorts each group in the youngest-first order (ascending by delivery tag).
+collect_acks(UAMQ, 0, true) ->
+ {lists:reverse(?QUEUE:to_list(UAMQ)), ?QUEUE:new()};
+collect_acks(UAMQ, DeliveryTag, Multiple) ->
+ collect_acks([], [], UAMQ, DeliveryTag, Multiple).
+
+collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
+ case ?QUEUE:out(UAMQ) of
+ {{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDT}},
+ UAMQTail} ->
+ if CurrentDT == DeliveryTag ->
+ {[UnackedMsg | AcknowledgedAcc],
+ case RemainingAcc of
+ [] -> UAMQTail;
_ -> ?QUEUE:join(
- ?QUEUE:from_list(lists:reverse(PrefixAcc)),
- QTail)
+ ?QUEUE:from_list(lists:reverse(RemainingAcc)),
+ UAMQTail)
end};
Multiple ->
- collect_acks([UnackedMsg | ToAcc], PrefixAcc,
- QTail, DeliveryTag, Multiple);
+ collect_acks([UnackedMsg | AcknowledgedAcc], RemainingAcc,
+ UAMQTail, DeliveryTag, Multiple);
true ->
- collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
- QTail, DeliveryTag, Multiple)
+ collect_acks(AcknowledgedAcc, [UnackedMsg | RemainingAcc],
+ UAMQTail, DeliveryTag, Multiple)
end;
{empty, _} ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-%% NB: Acked is in youngest-first order
-ack(Acked, State = #ch{queue_states = QueueStates0}) ->
+%% Settles (acknowledges) messages at the queue replica process level.
+%% This happens in the youngest-first order (ascending by delivery tag).
+settle_acks(Acks, State = #ch{queue_states = QueueStates0}) ->
{QueueStates, Actions} =
foreach_per_queue(
fun ({QRef, CTag}, MsgIds, {Acc0, ActionsAcc0}) ->
@@ -2071,8 +2084,8 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) ->
{protocol_error, ErrorType, Reason, ReasonArgs} ->
rabbit_misc:protocol_error(ErrorType, Reason, ReasonArgs)
end
- end, Acked, {QueueStates0, []}),
- ok = notify_limiter(State#ch.limiter, Acked),
+ end, Acks, {QueueStates0, []}),
+ ok = notify_limiter(State#ch.limiter, Acks),
{State#ch{queue_states = QueueStates}, Actions}.
incr_queue_stats(QName, MsgIds, State = #ch{queue_states = QueueStates}) ->