diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-12 12:20:50 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-12 12:20:50 +0100 |
| commit | 134eab808bd1fb6e408e964df83c59a7535b6cca (patch) | |
| tree | 112a49fccb67a837b9ce318779ea4e0f3a802372 /src | |
| parent | c229634a282a604c998a6d1936d9b33579bdebab (diff) | |
| download | rabbitmq-server-git-134eab808bd1fb6e408e964df83c59a7535b6cca.tar.gz | |
refactor + timer cancelled on channel terminate
When the channel terminates (for whatever reason), any outstanding
acks are simply lost. The reasoning behind this is that:
1) if the channel closed due to an exception, it should close immediately;
2) if the client wants to close the channel, but receive all of the
acks first, it should wait for the acks and only then send
channel.close.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b5c5d8a784..fe2cbbda51 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -259,14 +259,14 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> {noreply, queue_blocked(QPid, State)}; handle_info(multiple_ack_flush, State = #ch{ writer_pid = WriterPid, - confirm = #confirm{held_acks = As} } ) -> + confirm = C = #confirm{held_acks = As} } ) -> rabbit_log:info("channel got a multiple_ack_flush message~n" "held acks: ~p~n", [gb_sets:to_list(As)]), case gb_sets:is_empty(As) of true -> ok; false -> flush_multiple(As, WriterPid) end, - {noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}. + {noreply, State#ch{confirm = C#confirm{ held_acks = gb_sets:new()}}}. flush_multiple(Acks, WriterPid) -> [First | Rest] = gb_sets:to_list(Acks), @@ -501,10 +501,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State), - State1 = State#ch{ confirm = Confirm1 }, {noreply, case TxnKey of - none -> State1; - _ -> add_tx_participants(DeliveredQPids, State1) + none -> State#ch{confirm = Confirm1}; + _ -> add_tx_participants(DeliveredQPids, State) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -892,19 +891,17 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); -handle_method(#'tx.select'{}, _, - State = #ch{transaction_id = none, - confirm = #confirm{enabled = false}}) -> - {reply, #'tx.select_ok'{}, new_tx(State)}; - -handle_method(#'tx.select'{}, _, - State = #ch{confirm = #confirm{enabled = false}}) -> - {reply, #'tx.select_ok'{}, State}; -handle_method(#'tx.select'{}, _, _State) -> +handle_method(#'tx.select'{}, _, #ch{confirm = #confirm{enabled = false}}) -> rabbit_misc:protocol_error( precondition_failed, "a confirm channel cannot be made transactional", []); +handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> + {reply, #'tx.select_ok'{}, new_tx(State)}; + +handle_method(#'tx.select'{}, _, State) -> + {reply, #'tx.select_ok'{}, State}; + handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); @@ -919,18 +916,21 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'confirm.select'{multiple = Multiple, - nowait = NoWait}, +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "transactional channel cannot be made confirm", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, _, - State = #ch{ transaction_id = none, - confirm = C = #confirm{enabled = false}}) -> + State = #ch{confirm = C = #confirm{enabled = false}}) -> rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", [Multiple, NoWait]), - TRef = case Multiple of - false -> not_started; - true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL, - multiple_ack_flush) - end, + {ok, TRef} = case Multiple of + false -> {ok, not_started}; + true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL, + multiple_ack_flush) + end, State1 = State#ch{confirm = C#confirm{ enabled = true, multiple = Multiple, tref = TRef }}, @@ -954,9 +954,6 @@ handle_method(#'confirm.select'{}, #ch{confirm = #confirm{enabled = true}}) -> rabbit_misc:protocol_error( precondition_failed, "cannot change confirm channel multiple setting", []); -handle_method(#'confirm.select'{}, _, _State) -> - rabbit_misc:protocol_error( - precondition_failed, "transactional channel cannot be made confirm", []); handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> @@ -1198,7 +1195,14 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> +terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + confirm = #confirm {tref = TRef}}) -> + case TRef of + not_started -> ok; + _ -> + rabbit_log:info("Canceling multiple ack timer: ~p~n", [TRef]), + {ok, cancel} = timer:cancel(TRef) + end, pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]), rabbit_writer:shutdown(WriterPid), |
