diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 76 |
1 files changed, 44 insertions, 32 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a82e5eff3e..f49dbd9329 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,7 +34,8 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed}). + confirm_enabled, publish_seqno, unconfirmed, confirmed, + queue_unconfirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -174,7 +175,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, confirm_enabled = false, publish_seqno = 1, unconfirmed = gb_trees:empty(), - confirmed = []}, + confirmed = [], + queue_unconfirmed = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -278,19 +280,18 @@ handle_info(timeout, State) -> noreply(State); handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed = UC}) -> - %% TODO: this does a complete scan and partial rebuild of the - %% tree, which is quite efficient. To do better we'd need to - %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MXs, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}, State), + State = #ch{queue_unconfirmed = QU}) -> + MsgSeqNos = case dict:find(QPid, QU) of + {ok, MsgSet} -> gb_sets:to_list(MsgSet); + error -> [] + end, + {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), erase_queue_stats(QPid), - State1 = case Reason of - normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) - end, - noreply(queue_blocked(QPid, State1)). + State2 = (case Reason of + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 + end)(MXs, State1), + noreply(queue_blocked(QPid, State2)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -476,13 +477,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc, _State) -> - Acc; -remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> - remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), - State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -495,25 +489,39 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = +confirm(MsgSeqNos, QPid, State) -> + {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + record_confirms(MXs, State1). + +process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed = UC, queue_unconfirmed = QU}) -> + {MXs, UC1, QU1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + fun(MsgSeqNo, {_DMs, UC0, _QU} = Acc) -> case gb_trees:lookup(MsgSeqNo, UC0) of none -> Acc; {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) end - end, {[], UC}, MsgSeqNos), - record_confirms(MXs, State#ch{unconfirmed = UC1}). + end, {[], UC, QU}, MsgSeqNos), + {MXs, State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}}. -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> +remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC, QU}, State) -> Qs1 = sets:del_element(QPid, Qs), %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), + + case dict:find(QPid, QU) of + {ok, Msgs} -> Msgs1 = gb_sets:delete(MsgSeqNo, Msgs), + case gb_sets:is_empty(Msgs1) of + true -> QU1 = dict:erase(QPid, QU); + false -> QU1 = dict:store(QPid, Msgs1, QU) + end; + _ -> QU1 = QU + end, + case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} + 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC), QU1}; + _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC), QU1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1250,10 +1258,14 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed = UC} = State, - [maybe_monitor(QPid) || QPid <- QPids], + #ch{unconfirmed = UC, queue_unconfirmed = QU} = State, UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), - State#ch{unconfirmed = UC1}. + QU1 = lists:foldl(fun (QPid, QU2) -> + maybe_monitor(QPid), + dict:update(QPid, fun (Msgs)-> gb_sets:add(MsgSeqNo, Msgs) end, + gb_sets:singleton(MsgSeqNo), QU2) + end, QU, QPids), + State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; |
