summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-12 12:20:50 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-12 12:20:50 +0100
commit134eab808bd1fb6e408e964df83c59a7535b6cca (patch)
tree112a49fccb67a837b9ce318779ea4e0f3a802372 /src
parentc229634a282a604c998a6d1936d9b33579bdebab (diff)
downloadrabbitmq-server-git-134eab808bd1fb6e408e964df83c59a7535b6cca.tar.gz
refactor + timer cancelled on channel terminate
When the channel terminates (for whatever reason), any outstanding acks are simply lost. The reasoning behind this is that: 1) if the channel closed due to an exception, it should close immediately; 2) if the client wants to close the channel, but receive all of the acks first, it should wait for the acks and only then send channel.close.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl58
1 files changed, 31 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b5c5d8a784..fe2cbbda51 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -259,14 +259,14 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
{noreply, queue_blocked(QPid, State)};
handle_info(multiple_ack_flush,
State = #ch{ writer_pid = WriterPid,
- confirm = #confirm{held_acks = As} } ) ->
+ confirm = C = #confirm{held_acks = As} } ) ->
rabbit_log:info("channel got a multiple_ack_flush message~n"
"held acks: ~p~n", [gb_sets:to_list(As)]),
case gb_sets:is_empty(As) of
true -> ok;
false -> flush_multiple(As, WriterPid)
end,
- {noreply, State#ch{confirm = #confirm{ held_acks = gb_sets:new()}}}.
+ {noreply, State#ch{confirm = C#confirm{ held_acks = gb_sets:new()}}}.
flush_multiple(Acks, WriterPid) ->
[First | Rest] = gb_sets:to_list(Acks),
@@ -501,10 +501,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State),
- State1 = State#ch{ confirm = Confirm1 },
{noreply, case TxnKey of
- none -> State1;
- _ -> add_tx_participants(DeliveredQPids, State1)
+ none -> State#ch{confirm = Confirm1};
+ _ -> add_tx_participants(DeliveredQPids, State)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -892,19 +891,17 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-handle_method(#'tx.select'{}, _,
- State = #ch{transaction_id = none,
- confirm = #confirm{enabled = false}}) ->
- {reply, #'tx.select_ok'{}, new_tx(State)};
-
-handle_method(#'tx.select'{}, _,
- State = #ch{confirm = #confirm{enabled = false}}) ->
- {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.select'{}, _, _State) ->
+handle_method(#'tx.select'{}, _, #ch{confirm = #confirm{enabled = false}}) ->
rabbit_misc:protocol_error(
precondition_failed, "a confirm channel cannot be made transactional", []);
+handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
+ {reply, #'tx.select_ok'{}, new_tx(State)};
+
+handle_method(#'tx.select'{}, _, State) ->
+ {reply, #'tx.select_ok'{}, State};
+
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
@@ -919,18 +916,21 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-handle_method(#'confirm.select'{multiple = Multiple,
- nowait = NoWait},
+handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
+ when TxId =/= none ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "transactional channel cannot be made confirm", []);
+
+handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
_,
- State = #ch{ transaction_id = none,
- confirm = C = #confirm{enabled = false}}) ->
+ State = #ch{confirm = C = #confirm{enabled = false}}) ->
rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
[Multiple, NoWait]),
- TRef = case Multiple of
- false -> not_started;
- true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL,
- multiple_ack_flush)
- end,
+ {ok, TRef} = case Multiple of
+ false -> {ok, not_started};
+ true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL,
+ multiple_ack_flush)
+ end,
State1 = State#ch{confirm = C#confirm{ enabled = true,
multiple = Multiple,
tref = TRef }},
@@ -954,9 +954,6 @@ handle_method(#'confirm.select'{},
#ch{confirm = #confirm{enabled = true}}) ->
rabbit_misc:protocol_error(
precondition_failed, "cannot change confirm channel multiple setting", []);
-handle_method(#'confirm.select'{}, _, _State) ->
- rabbit_misc:protocol_error(
- precondition_failed, "transactional channel cannot be made confirm", []);
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
@@ -1198,7 +1195,14 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
+ confirm = #confirm {tref = TRef}}) ->
+ case TRef of
+ not_started -> ok;
+ _ ->
+ rabbit_log:info("Canceling multiple ack timer: ~p~n", [TRef]),
+ {ok, cancel} = timer:cancel(TRef)
+ end,
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]),
rabbit_writer:shutdown(WriterPid),