diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-31 11:56:22 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-31 11:56:22 +0100 |
| commit | 0d726de461fd6afc4755ce03a803181fec47e51c (patch) | |
| tree | 11fd5f9399dd25ebe42e2e1635b0607cccc10f09 | |
| parent | 4c04aeb19191abeed94e5de11605432311ba5a43 (diff) | |
| download | rabbitmq-server-git-0d726de461fd6afc4755ce03a803181fec47e51c.tar.gz | |
pending acks are sent out when the channel becomes idle or every 1s
| -rw-r--r-- | src/rabbit_channel.erl | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0964700c29..bebb59a6f1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -70,7 +70,7 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(MULTIPLE_ACK_FLUSH_INTERVAL, 5000). +-define(MULTIPLE_ACK_FLUSH_INTERVAL, 1000). %%---------------------------------------------------------------------------- @@ -266,13 +266,7 @@ handle_cast(multiple_ack_flush, State = #ch{writer_pid = WriterPid, held_confirms = As, need_confirming = NA}) -> - case gb_sets:is_empty(As) of - true -> ok; % this should never be the case - false -> flush_multiple(As, WriterPid, case gb_sets:is_empty(NA) of - false -> gb_sets:smallest(NA); - true -> gb_sets:largest(As)+1 - end) - end, + handle_multiple_flush(WriterPid, As, NA), {noreply, State #ch { held_confirms = gb_sets:new(), confirm_tref = undefined }}; @@ -299,9 +293,14 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1)}. -handle_pre_hibernate(State) -> +handle_pre_hibernate(State = #ch { writer_pid = WriterPid, + held_confirms = As, + need_confirming = NA }) -> ok = clear_permission_cache(), - {hibernate, stop_stats_timer(State)}. + handle_multiple_flush(WriterPid, As, NA), + {hibernate, stop_stats_timer( + State #ch { held_confirms = gb_sets:new(), + confirm_tref = undefined })}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -1354,6 +1353,16 @@ stop_ack_timer(State = #ch{confirm_tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State #ch { confirm_tref = undefined }. +handle_multiple_flush(WriterPid, As, NA) -> + case gb_sets:is_empty(As) of + true -> ok; + false -> flush_multiple(As, WriterPid, case gb_sets:is_empty(NA) of + false -> gb_sets:smallest(NA); + true -> gb_sets:largest(As)+1 + end) + end. + + flush_multiple(Acks, WriterPid, SmallestNotAcked) -> [First | Rest] = gb_sets:to_list(Acks), Remaining = case Rest of |
