diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 14:34:09 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 14:34:09 +0100 |
| commit | 94d35f70d379c8fd8fa8abd31af441c7ab0002eb (patch) | |
| tree | acd83dbcdf461000ba9fb845b6c9ff00dfb18daf /src | |
| parent | eaedf1fc10f2c754657e337a151102e2257ccc06 (diff) | |
| download | rabbitmq-server-git-94d35f70d379c8fd8fa8abd31af441c7ab0002eb.tar.gz | |
consumer_monitors is a dict again
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 46 |
1 files changed, 24 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c73d85ac05..6e57ff9fe3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -192,7 +192,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = gb_sets:new(), - consumer_monitors = gb_sets:new(), + consumer_monitors = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -330,7 +330,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State1 = handle_publishing_queue_down(QPid, Reason, State), erase_queue_stats(QPid), State2 = queue_blocked(QPid, State1), - State3 = case gb_sets:is_member(QPid, ConsumerMonitors) of + State3 = case dict:is_key(QPid, ConsumerMonitors) of false -> State2; true -> handle_consuming_queue_down(QPid, State1) end, @@ -782,14 +782,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); {ok, Q = #amqqueue{pid = QPid}} -> - ConsumerMapping1 = dict:erase(ConsumerTag,ConsumerMapping), + ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), ConsumerMonitors1 = - case dict:size(dict:filter(fun (_, #amqqueue{pid = QPid0}) - when QPid0 =:= QPid -> true; - (_, _) -> false - end, ConsumerMapping1)) of - 0 -> gb_sets:delete(QPid, ConsumerMonitors); - _ -> ConsumerMonitors + case dict:find(QPid, ConsumerMonitors) of + error -> ConsumerMonitors; + {ok, CTags} -> case gb_sets:size(CTags) of + 1 -> dict:erase(QPid, ConsumerMonitors); + _ -> dict:store(QPid, gb_sets:delete(ConsumerTag, CTags), ConsumerMonitors) + end end, State1 = State#ch{consumer_mapping = ConsumerMapping1, consumer_monitors = ConsumerMonitors1}, @@ -1155,9 +1155,12 @@ consumer_monitor(ConsumerTag, Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - monitor_queue(QPid, - State#ch{consumer_monitors = - gb_sets:add(QPid, ConsumerMonitors)}); + ConsumerMonitors1 = + dict:update(QPid, + fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) end, + gb_sets:singleton(ConsumerTag), + ConsumerMonitors), + monitor_queue(QPid, State#ch{consumer_monitors = ConsumerMonitors1}); _ -> State end. @@ -1178,7 +1181,7 @@ queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, blocking = Blocking, unconfirmed_qm = UQM}) -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, - ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors), + ConsumerMonitored = dict:is_key(QPid, ConsumerMonitors), QueueBlocked = gb_sets:is_element(QPid, Blocking), ConfirmMonitored = gb_trees:is_defined(QPid, UQM), StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. @@ -1209,19 +1212,18 @@ handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, consumer_monitors = ConsumerMonitors, writer_pid = WriterPid}) -> - {ConsumerTags, ConsumerMapping1} = - dict:fold(fun (CTag, #amqqueue{pid = QPid0}, {CTags, CMap}) - when QPid =:= QPid0 -> - {[CTag | CTags], CMap}; - (CTag, Q, {CTags, CMap}) -> - {CTags, dict:store(CTag, Q, CMap)} - end, {[], dict:new()}, ConsumerMapping), - ConsumerMonitors1 = gb_sets:delete(QPid, ConsumerMonitors), + ConsumerTags = case dict:find(QPid, ConsumerMonitors) of + error -> []; + {ok, CTags} -> CTags + end, + ConsumerMonitors1 = dict:erase(QPid, ConsumerMonitors), + ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> dict:erase(CTag, CMap) end, + ConsumerMapping, ConsumerTags), [begin Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, nowait = true}, ok = rabbit_writer:send_command(WriterPid, Cancel) - end || ConsumerTag <- ConsumerTags], + end || ConsumerTag <- gb_sets:to_list(ConsumerTags)], State#ch{consumer_mapping = ConsumerMapping1, consumer_monitors = ConsumerMonitors1}. |
