summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Fox <tim@rabbitmq.com>2011-02-16 14:02:22 +0000
committerTim Fox <tim@rabbitmq.com>2011-02-16 14:02:22 +0000
commit9ca6b8dfa7526c1dcbde61978070e902d2b76986 (patch)
tree43df8a547c2e5f38ce5e3cdb9424abaed43d9f88
parentaac99fd42ed9c8bb7931f92812d12b9cdb63c760 (diff)
downloadrabbitmq-server-git-9ca6b8dfa7526c1dcbde61978070e902d2b76986.tar.gz
optimised removal of confirms in queue deletion, by introducing a dict of qpid->msg_seq_nos to enable fast lookup of all messages for a particular queue
-rw-r--r--src/rabbit_channel.erl76
1 files changed, 44 insertions, 32 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff3e..f49dbd9329 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,7 +34,8 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ confirm_enabled, publish_seqno, unconfirmed, confirmed,
+ queue_unconfirmed}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -174,7 +175,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
- confirmed = []},
+ confirmed = [],
+ queue_unconfirmed = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -278,19 +280,18 @@ handle_info(timeout, State) ->
noreply(State);
handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed = UC}) ->
- %% TODO: this does a complete scan and partial rebuild of the
- %% tree, which is quite efficient. To do better we'd need to
- %% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MXs, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}, State),
+ State = #ch{queue_unconfirmed = QU}) ->
+ MsgSeqNos = case dict:find(QPid, QU) of
+ {ok, MsgSet} -> gb_sets:to_list(MsgSet);
+ error -> []
+ end,
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
erase_queue_stats(QPid),
- State1 = case Reason of
- normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
- end,
- noreply(queue_blocked(QPid, State1)).
+ State2 = (case Reason of
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
+ end)(MXs, State1),
+ noreply(queue_blocked(QPid, State2)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -476,13 +477,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
- Acc;
-remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
- remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
- State).
-
record_confirm(undefined, _, State) ->
State;
record_confirm(MsgSeqNo, XName, State) ->
@@ -495,25 +489,39 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} =
+confirm(MsgSeqNos, QPid, State) ->
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ record_confirms(MXs, State1).
+
+process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed = UC, queue_unconfirmed = QU}) ->
+ {MXs, UC1, QU1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
+ fun(MsgSeqNo, {_DMs, UC0, _QU} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
none -> Acc;
{value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
end
- end, {[], UC}, MsgSeqNos),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
+ end, {[], UC, QU}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}}.
-remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
+remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC, QU}, State) ->
Qs1 = sets:del_element(QPid, Qs),
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
+
+ case dict:find(QPid, QU) of
+ {ok, Msgs} -> Msgs1 = gb_sets:delete(MsgSeqNo, Msgs),
+ case gb_sets:is_empty(Msgs1) of
+ true -> QU1 = dict:erase(QPid, QU);
+ false -> QU1 = dict:store(QPid, Msgs1, QU)
+ end;
+ _ -> QU1 = QU
+ end,
+
case sets:size(Qs1) of
- 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
+ 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC), QU1};
+ _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC), QU1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1250,10 +1258,14 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed = UC} = State,
- [maybe_monitor(QPid) || QPid <- QPids],
+ #ch{unconfirmed = UC, queue_unconfirmed = QU} = State,
UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
- State#ch{unconfirmed = UC1}.
+ QU1 = lists:foldl(fun (QPid, QU2) ->
+ maybe_monitor(QPid),
+ dict:update(QPid, fun (Msgs)-> gb_sets:add(MsgSeqNo, Msgs) end,
+ gb_sets:singleton(MsgSeqNo), QU2)
+ end, QU, QPids),
+ State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};