diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-01 12:10:48 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-01 12:10:48 +0100 |
| commit | 05e18dc26cbe7e4424f6fa2ec0ce4bee01ce2a94 (patch) | |
| tree | df2744fbdad8f9c37ce3856dc5129f4c06a1ceb3 | |
| parent | 33c43124c18b43ade8239e7e5d1881bf5ecd9000 (diff) | |
| download | rabbitmq-server-git-05e18dc26cbe7e4424f6fa2ec0ce4bee01ce2a94.tar.gz | |
keep track of uncommitted acks in a list instead of a queue
This is simpler.
| -rw-r--r-- | src/rabbit_channel.erl | 32 |
1 files changed, 15 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 660abb313f..7c85132d05 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,7 +34,7 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_ack_q, + unacked_message_q, uncommitted_message_q, uncommitted_acks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -185,7 +185,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, next_tag = 1, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new(), + uncommitted_acks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -669,10 +669,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {noreply, case TxStatus of none -> ack(Acked, State1); - in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, - queue:from_list( - lists:reverse(Acked))), - State1#ch{uncommitted_ack_q = NewTAQ} + in_progress -> State1#ch{uncommitted_acks = + Acked ++ State1#ch.uncommitted_acks} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -1067,20 +1065,20 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_ack_q = TAQ}) -> - State1 = new_tx(ack(queue:to_list(TAQ), - rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), + uncommitted_acks = TAL}) -> + State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, + State, TMQ))), {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_ack_q = TAQ}) -> - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = - queue:join(TAQ, UAMQ)})}; +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + uncommitted_acks = TAL}) -> + TAQ = queue:from_list(lists:reverse(TAL)), + {reply, #'tx.rollback_ok'{}, + new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> rabbit_misc:protocol_error( @@ -1311,7 +1309,7 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new()}. + uncommitted_acks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1493,8 +1491,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> - queue:len(TAQ); +i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> + length(TAL); i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> |
