diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-02-08 12:00:06 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-02-08 12:00:06 +0000 |
| commit | e848ee7fccccfc0f4821aa6b2613524a962c1a69 (patch) | |
| tree | 06285585d5f8cb45e763ab5a54c9fc78ab3a8581 /src | |
| parent | a80e6eb7008ea135fbf883573d1a2e20fc95074f (diff) | |
| parent | 8d67fce577d3c61e35cb82703f01dd6bdb00026c (diff) | |
| download | rabbitmq-server-git-e848ee7fccccfc0f4821aa6b2613524a962c1a69.tar.gz | |
Merge bug24234
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 76fbc73c63..282d7fb8c2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,9 +33,9 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_acks, - user, virtual_host, most_recently_declared_queue, queue_monitors, + limiter, tx_status, next_tag, unacked_message_q, + uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), uncommitted_acks = [], + uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -1063,10 +1064,15 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL}) -> +handle_method(#'tx.commit'{}, _, + State = #ch{uncommitted_message_q = TMQ, + uncommitted_acks = TAL, + uncommitted_nacks = TNL, + limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), ack(TAL, State1), + lists:foreach( + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> @@ -1074,8 +1080,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL}) -> - UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + TNL1 = lists:append([L || {_, L} <- TNL]), + UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> @@ -1239,14 +1247,26 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> +reject(DeliveryTag, Requeue, Multiple, + State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> + reject(Requeue, Acked, State1#ch.limiter), + State1; + in_progress -> + State1#ch{uncommitted_nacks = + [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + end}. + +reject(Requeue, Acked, Limiter) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}. + ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, Msg = {_QName, QPid, MsgId, Redelivered, _Message}, @@ -1301,7 +1321,8 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = []}. + uncommitted_acks = [], + uncommitted_nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; |
