summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-12 14:08:34 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-12 14:08:34 +0100
commit8099cd980949f7d7d5fe6972f01d624e8cde615e (patch)
tree529de39e44fc1a2e0fa3930d933fed8dbeb45e51 /src
parent134eab808bd1fb6e408e964df83c59a7535b6cca (diff)
downloadrabbitmq-server-git-8099cd980949f7d7d5fe6972f01d624e8cde615e.tar.gz
refactor timer code; looks more like msg_store now
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl161
1 files changed, 86 insertions, 75 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fe2cbbda51..bebfc9b736 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1]).
+-export([emit_stats/1, flush/1, flush_multiple_acks/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -152,6 +152,10 @@ emit_stats(Pid) ->
flush(Pid) ->
gen_server2:call(Pid, flush).
+flush_multiple_acks(Pid) ->
+ gen_server2:cast(Pid, multiple_ack_flush).
+
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
@@ -175,11 +179,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer(),
- confirm = #confirm{ enabled = false,
- count = 0,
- multiple = false,
- tref = not_started,
- held_acks = gb_sets:new()}},
+ confirm = #confirm{enabled = false,
+ count = 0,
+ multiple = false,
+ held_acks = gb_sets:new()}},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -246,7 +249,19 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
handle_cast(emit_stats, State) ->
internal_emit_stats(State),
- {noreply, State}.
+ {noreply, State};
+
+handle_cast(multiple_ack_flush,
+ State = #ch{writer_pid = WriterPid,
+ confirm = C = #confirm{held_acks = As}}) ->
+ rabbit_log:info("channel got a multiple_ack_flush message~n"
+ "held acks: ~p~n", [gb_sets:to_list(As)]),
+ case gb_sets:is_empty(As) of
+ true -> ok;
+ false -> flush_multiple(As, WriterPid)
+ end,
+ {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(),
+ tref = undefined}}}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -256,41 +271,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State)};
-handle_info(multiple_ack_flush,
- State = #ch{ writer_pid = WriterPid,
- confirm = C = #confirm{held_acks = As} } ) ->
- rabbit_log:info("channel got a multiple_ack_flush message~n"
- "held acks: ~p~n", [gb_sets:to_list(As)]),
- case gb_sets:is_empty(As) of
- true -> ok;
- false -> flush_multiple(As, WriterPid)
- end,
- {noreply, State#ch{confirm = C#confirm{ held_acks = gb_sets:new()}}}.
-
-flush_multiple(Acks, WriterPid) ->
- [First | Rest] = gb_sets:to_list(Acks),
- flush_multiple(First, Rest, WriterPid).
-
-flush_multiple(Prev, [Cur | Rest], WriterPid) ->
- ExpNext = Prev+1,
- case Cur of
- ExpNext ->
- flush_multiple(Cur, Rest, WriterPid);
- _ ->
- flush_multiple(Prev, [], WriterPid),
- lists:foreach(fun(A) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = A})
- end, [Cur | Rest])
- end;
-flush_multiple(Prev, [], WriterPid) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{ delivery_tag = Prev,
- multiple = true }).
-
+ {noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -434,21 +415,24 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-handle_confirm(C = #confirm{ enabled = false }, _, _) ->
- C;
-handle_confirm(C = #confirm{ count = Count, multiple = false }, false, WriterPid) ->
+handle_confirm(State = #ch{confirm = #confirm{enabled = false}}, _) ->
+ State;
+handle_confirm(State = #ch{writer_pid = WriterPid,
+ confirm = C = #confirm{ count = Count, multiple = false}},
+ false) ->
rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]),
ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }),
- C#confirm{ count = Count+1 };
-handle_confirm(C = #confirm{ count = Count,
- multiple = true,
- held_acks = As }, false, _WriterPid) ->
+ State#ch{confirm = C#confirm{ count = Count+1 }};
+handle_confirm(State = #ch{confirm = C = #confirm{count = Count,
+ multiple = true,
+ held_acks = As}}, false) ->
rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]),
- C#confirm{ count = Count+1,
- held_acks = gb_sets:add(Count, As) };
-handle_confirm(C = #confirm{ count = Count }, IsPersistent, _WriterPid) ->
+ State1 = start_ack_timer(State),
+ State1#ch{confirm = C#confirm{count = Count+1,
+ held_acks = gb_sets:add(Count, As)}};
+handle_confirm(State = #ch{confirm = C = #confirm{count = Count}}, IsPersistent) ->
rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]),
- C#confirm{ count = Count+1 }.
+ State#ch{confirm = C#confirm{count = Count+1}}.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -474,8 +458,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- writer_pid = WriterPid,
- confirm = Confirm}) ->
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -483,7 +466,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- Confirm1 = handle_confirm(Confirm, IsPersistent, WriterPid),
+ State1 = handle_confirm(State, IsPersistent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -500,10 +483,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State),
+ QPid <- DeliveredQPids]], publish, State1),
{noreply, case TxnKey of
- none -> State#ch{confirm = Confirm1};
- _ -> add_tx_participants(DeliveredQPids, State)
+ none -> State1;
+ _ -> add_tx_participants(DeliveredQPids, State1)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -926,14 +909,8 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
State = #ch{confirm = C = #confirm{enabled = false}}) ->
rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
[Multiple, NoWait]),
- {ok, TRef} = case Multiple of
- false -> {ok, not_started};
- true -> timer:send_interval(?MULTIPLE_ACK_FLUSH_INTERVAL,
- multiple_ack_flush)
- end,
- State1 = State#ch{confirm = C#confirm{ enabled = true,
- multiple = Multiple,
- tref = TRef }},
+ State1 = State#ch{confirm = C#confirm{enabled = true,
+ multiple = Multiple}},
case NoWait of
true -> {noreply, State1};
false -> {reply, #'confirm.select_ok'{}, State1}
@@ -1195,14 +1172,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
- confirm = #confirm {tref = TRef}}) ->
- case TRef of
- not_started -> ok;
- _ ->
- rabbit_log:info("Canceling multiple ack timer: ~p~n", [TRef]),
- {ok, cancel} = timer:cancel(TRef)
- end,
+terminate(State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+ stop_ack_timer(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]),
rabbit_writer:shutdown(WriterPid),
@@ -1284,3 +1255,43 @@ erase_queue_stats(QPid) ->
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+
+start_ack_timer(State = #ch{confirm = C = #confirm{tref = undefined}}) ->
+ rabbit_log:info("starting ack timer...~n"),
+ {ok, TRef} = timer:apply_after(?MULTIPLE_ACK_FLUSH_INTERVAL,
+ ?MODULE, flush_multiple_acks, [self()]),
+ State#ch{confirm = C#confirm{tref = TRef}};
+start_ack_timer(State) ->
+ rabbit_log:info("timer already started.. nop~n"),
+ State.
+
+stop_ack_timer(State = #ch{confirm = #confirm{tref = undefined}}) ->
+ rabbit_log:info("stopping a stopped ack timer.. nop~n"),
+ State;
+stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) ->
+ rabbit_log:info("canceling ack timer: ~p~n", [TRef]),
+ {ok, cancel} = timer:cancel(TRef),
+ State#ch{confirm = C#confirm{tref = undefined}}.
+
+flush_multiple(Acks, WriterPid) ->
+ [First | Rest] = gb_sets:to_list(Acks),
+ flush_multiple(First, Rest, WriterPid).
+
+flush_multiple(Prev, [Cur | Rest], WriterPid) ->
+ ExpNext = Prev+1,
+ case Cur of
+ ExpNext ->
+ flush_multiple(Cur, Rest, WriterPid);
+ _ ->
+ flush_multiple(Prev, [], WriterPid),
+ lists:foreach(fun(A) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = A})
+ end, [Cur | Rest])
+ end;
+flush_multiple(Prev, [], WriterPid) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{ delivery_tag = Prev,
+ multiple = true }).