summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-10 15:38:09 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-10 15:38:09 +0000
commit1be3c923ca50061000a7673c7970fb09f6b6fcc0 (patch)
tree181bccd79a434d8c6cb837ff91cea54a3164e5ac
parentc521643a67de6b64adcf00557464d72404d7e6d9 (diff)
downloadrabbitmq-server-git-1be3c923ca50061000a7673c7970fb09f6b6fcc0.tar.gz
rethink grouping logic
-rw-r--r--src/rabbit_channel.erl51
1 files changed, 26 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 579f109a49..52836e4037 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1235,37 +1235,38 @@ internal_flush_confirms(State, []) ->
State;
internal_flush_confirms(State = #ch{writer_pid = WriterPid,
unconfirmed = UC}, Cs) ->
- [First | Rest] = lists:usort(Cs),
- LUC = case gb_sets:size(UC) of
- 0 -> lists:last(Cs) + 1;
- _ -> gb_sets:smallest(UC)
- end,
- Is = case First < LUC of
- true -> {Mult, Inds} =
- find_consecutive_sequence(LUC, First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult,
- multiple = true}),
- Inds;
- _ -> [First | Rest]
- end,
+ CutOff = find_cutoff(UC, gb_sets:from_list(Cs)),
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, lists:usort(Cs)),
+ case Ms of
+ [] -> ok;
+ _ -> ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
+ end,
ok = lists:foldl(
fun(T, ok) -> rabbit_writer:send_command(
WriterPid,
#'basic.ack'{delivery_tag = T})
- end, ok, Is),
+ end, ok, Ss),
State.
-%% Find longest sequence of consecutive numbers at the beginning with
-%% no elements exceeding limit.
-find_consecutive_sequence(_Limit, Last, []) ->
- {Last, []};
-find_consecutive_sequence(Limit, Last, [N | Ns])
- when N == (Last + 1) andalso N < Limit ->
- find_consecutive_sequence(Limit, N, Ns);
-find_consecutive_sequence(_Limit, Last, Ns) ->
- {Last, Ns}.
+%% Find the smallest element in SetA, not in SetB.
+find_cutoff(SetA, SetB) ->
+ ItA = gb_sets:iterator(SetA),
+ case gb_sets:next(ItA) of
+ none -> -1;
+ {Element, ItA1} -> find_cutoff(Element, ItA, SetB)
+ end.
+
+find_cutoff(Element, ItA, SetB) ->
+ case gb_sets:is_element(Element, SetB) of
+ false -> Element;
+ true -> case gb_sets:next(ItA) of
+ {Element1, ItA1} -> find_cutoff(Element1, ItA1, SetB);
+ none -> Element + 1
+ end
+ end.
terminate(_State) ->
pg_local:leave(rabbit_channels, self()),