summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-31 11:56:22 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-31 11:56:22 +0100
commit0d726de461fd6afc4755ce03a803181fec47e51c (patch)
tree11fd5f9399dd25ebe42e2e1635b0607cccc10f09
parent4c04aeb19191abeed94e5de11605432311ba5a43 (diff)
downloadrabbitmq-server-git-0d726de461fd6afc4755ce03a803181fec47e51c.tar.gz
pending acks are sent out when the channel becomes idle or every 1s
-rw-r--r--src/rabbit_channel.erl29
1 files changed, 19 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0964700c29..bebb59a6f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -70,7 +70,7 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(MULTIPLE_ACK_FLUSH_INTERVAL, 5000).
+-define(MULTIPLE_ACK_FLUSH_INTERVAL, 1000).
%%----------------------------------------------------------------------------
@@ -266,13 +266,7 @@ handle_cast(multiple_ack_flush,
State = #ch{writer_pid = WriterPid,
held_confirms = As,
need_confirming = NA}) ->
- case gb_sets:is_empty(As) of
- true -> ok; % this should never be the case
- false -> flush_multiple(As, WriterPid, case gb_sets:is_empty(NA) of
- false -> gb_sets:smallest(NA);
- true -> gb_sets:largest(As)+1
- end)
- end,
+ handle_multiple_flush(WriterPid, As, NA),
{noreply, State #ch { held_confirms = gb_sets:new(),
confirm_tref = undefined }};
@@ -299,9 +293,14 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1)}.
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State = #ch { writer_pid = WriterPid,
+ held_confirms = As,
+ need_confirming = NA }) ->
ok = clear_permission_cache(),
- {hibernate, stop_stats_timer(State)}.
+ handle_multiple_flush(WriterPid, As, NA),
+ {hibernate, stop_stats_timer(
+ State #ch { held_confirms = gb_sets:new(),
+ confirm_tref = undefined })}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -1354,6 +1353,16 @@ stop_ack_timer(State = #ch{confirm_tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State #ch { confirm_tref = undefined }.
+handle_multiple_flush(WriterPid, As, NA) ->
+ case gb_sets:is_empty(As) of
+ true -> ok;
+ false -> flush_multiple(As, WriterPid, case gb_sets:is_empty(NA) of
+ false -> gb_sets:smallest(NA);
+ true -> gb_sets:largest(As)+1
+ end)
+ end.
+
+
flush_multiple(Acks, WriterPid, SmallestNotAcked) ->
[First | Rest] = gb_sets:to_list(Acks),
Remaining = case Rest of