summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-24 14:20:22 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-24 14:20:22 +0000
commit984b204e9754af60b09d7d0edd826298ff3e76b3 (patch)
tree66bb95f2c98c1eb7684114d1ba15e281fe224f73
parent1f8d8a6d99f3a1fef1117bbb40c9ca6da20659e8 (diff)
parent8c823dc0a8b2f59102fe136ea30db8968439838f (diff)
downloadrabbitmq-server-git-984b204e9754af60b09d7d0edd826298ff3e76b3.tar.gz
Merging bug23637 to default
-rw-r--r--src/rabbit_amqqueue_process.erl27
-rw-r--r--src/rabbit_variable_queue.erl8
2 files changed, 15 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e794b4aa1e..44053593b6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -410,27 +410,22 @@ confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
fun(Guid, {CMs, GTC0}) ->
case dict:find(Guid, GTC0) of
{ok, {ChPid, MsgSeqNo}} ->
- {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(Guid, GTC0)};
_ ->
{CMs, GTC0}
end
- end, {[], GTC}, Guids),
- case lists:usort(CMs) of
- [{Ch, MsgSeqNo} | CMs1] ->
- [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
- {ChPid, MsgSeqNos} <- group_confirms_by_channel(
- CMs1, [{Ch, [MsgSeqNo]}])];
- [] ->
- ok
- end,
+ end, {gb_trees:empty(), GTC}, Guids),
+ gb_trees:map(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
State#q{guid_to_channel = GTC1}.
-group_confirms_by_channel([], Acc) ->
- Acc;
-group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
- group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
-group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
- group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
{no_confirm, State};
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7142d56072..ea7caba8c8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1447,8 +1447,8 @@ msgs_written_to_disk(QPid, GuidSet, written) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
+ gb_sets:union(
+ MOD, gb_sets:intersection(UC, GuidSet)) })
end).
msg_indices_written_to_disk(QPid, GuidSet) ->
@@ -1459,8 +1459,8 @@ msg_indices_written_to_disk(QPid, GuidSet) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
+ gb_sets:union(
+ MIOD, gb_sets:intersection(UC, GuidSet)) })
end).
%%----------------------------------------------------------------------------