diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-10 15:38:09 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-10 15:38:09 +0000 |
| commit | 1be3c923ca50061000a7673c7970fb09f6b6fcc0 (patch) | |
| tree | 181bccd79a434d8c6cb837ff91cea54a3164e5ac | |
| parent | c521643a67de6b64adcf00557464d72404d7e6d9 (diff) | |
| download | rabbitmq-server-git-1be3c923ca50061000a7673c7970fb09f6b6fcc0.tar.gz | |
rethink grouping logic
| -rw-r--r-- | src/rabbit_channel.erl | 51 |
1 files changed, 26 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 579f109a49..52836e4037 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1235,37 +1235,38 @@ internal_flush_confirms(State, []) -> State; internal_flush_confirms(State = #ch{writer_pid = WriterPid, unconfirmed = UC}, Cs) -> - [First | Rest] = lists:usort(Cs), - LUC = case gb_sets:size(UC) of - 0 -> lists:last(Cs) + 1; - _ -> gb_sets:smallest(UC) - end, - Is = case First < LUC of - true -> {Mult, Inds} = - find_consecutive_sequence(LUC, First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, - multiple = true}), - Inds; - _ -> [First | Rest] - end, + CutOff = find_cutoff(UC, gb_sets:from_list(Cs)), + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, lists:usort(Cs)), + case Ms of + [] -> ok; + _ -> ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) + end, ok = lists:foldl( fun(T, ok) -> rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = T}) - end, ok, Is), + end, ok, Ss), State. -%% Find longest sequence of consecutive numbers at the beginning with -%% no elements exceeding limit. -find_consecutive_sequence(_Limit, Last, []) -> - {Last, []}; -find_consecutive_sequence(Limit, Last, [N | Ns]) - when N == (Last + 1) andalso N < Limit -> - find_consecutive_sequence(Limit, N, Ns); -find_consecutive_sequence(_Limit, Last, Ns) -> - {Last, Ns}. +%% Find the smallest element in SetA, not in SetB. +find_cutoff(SetA, SetB) -> + ItA = gb_sets:iterator(SetA), + case gb_sets:next(ItA) of + none -> -1; + {Element, ItA1} -> find_cutoff(Element, ItA, SetB) + end. + +find_cutoff(Element, ItA, SetB) -> + case gb_sets:is_element(Element, SetB) of + false -> Element; + true -> case gb_sets:next(ItA) of + {Element1, ItA1} -> find_cutoff(Element1, ItA1, SetB); + none -> Element + 1 + end + end. terminate(_State) -> pg_local:leave(rabbit_channels, self()), |
