summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl32
1 files changed, 15 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 660abb313f..7c85132d05 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,7 +34,7 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
+ unacked_message_q, uncommitted_message_q, uncommitted_acks,
user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -185,7 +185,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new(),
+ uncommitted_acks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -669,10 +669,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{noreply,
case TxStatus of
none -> ack(Acked, State1);
- in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q,
- queue:from_list(
- lists:reverse(Acked))),
- State1#ch{uncommitted_ack_q = NewTAQ}
+ in_progress -> State1#ch{uncommitted_acks =
+ Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -1067,20 +1065,20 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_ack_q = TAQ}) ->
- State1 = new_tx(ack(queue:to_list(TAQ),
- rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
+ uncommitted_acks = TAL}) ->
+ State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ State, TMQ))),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = TAQ}) ->
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
- queue:join(TAQ, UAMQ)})};
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_acks = TAL}) ->
+ TAQ = queue:from_list(lists:reverse(TAL)),
+ {reply, #'tx.rollback_ok'{},
+ new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
@@ -1311,7 +1309,7 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new()}.
+ uncommitted_acks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1493,8 +1491,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
- queue:len(TAQ);
+i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
+ length(TAL);
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->