diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-18 12:30:11 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-18 12:30:11 +0100 |
| commit | 4ff1f35bb701a45443d14279bf8772382046854e (patch) | |
| tree | b0e2e729886d538c27d7f2c41257a2568cdfdac0 /src | |
| parent | 7678f8aeb43c9a3d08229efaa217027548296a13 (diff) | |
| download | rabbitmq-server-git-4ff1f35bb701a45443d14279bf8772382046854e.tar.gz | |
improved compiling- and working-ness
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 67 |
1 files changed, 33 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e4b48c4706..44b0ae77ff 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -580,27 +580,18 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. -add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). - remove_consumer(ChPid, ConsumerTag, Queue) -> - queue:filter(fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) + queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). remove_consumers(ChPid, Queue) -> - {Kept, Removed} = split_by_channel(ChPid, Queue), - [emit_consumer_deleted(Ch, CTag) || - {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], - Kept. - -move_consumers(ChPid, From, To) -> - {Kept, Removed} = split_by_channel(ChPid, From), - {Kept, queue:join(To, Removed)}. - -split_by_channel(ChPid, Queue) -> - {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue)), - {queue:from_list(Kept), queue:from_list(Removed)}. + queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> + emit_consumer_deleted(ChPid, CTag), + false; + (_) -> + true + end, Queue). possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -628,7 +619,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - C = #cr{ch_pid = ChPid, acktags = ChAckTags} -> + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = Blocked} -> + _ = remove_consumers(ChPid, Blocked), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -636,9 +630,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), - blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}, + ChPid, State#q.active_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -661,8 +653,10 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso - queue:is_empty(State#q.blocked_consumers). +consumer_count() -> + lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). + +is_unused(_State) -> consumer_count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -789,8 +783,8 @@ i(messages_unacknowledged, _) -> i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); -i(consumers, State) -> - queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); +i(consumers, _) -> + consumer_count(); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -806,13 +800,14 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -consumers(#q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> +consumers(#q{active_consumers = ActiveConsumers}) -> rabbit_misc:queue_fold( fun ({ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}, Acc) -> [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)). + end, [], lists:foldl(fun (#cr{blocked_consumers = Consumers}, Acc) -> + queue:join(Acc, Consumers) + end, ActiveConsumers, all_ch_record())). emit_stats(State) -> emit_stats(State, []). @@ -999,20 +994,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C -> - update_consumer_count(C, -1), + C = #cr{blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), + update_consumer_count(C#cr{blocked_consumers = + remove_consumer(ChPid, ConsumerTag, + Blocked)}, -1), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, ConsumerTag, Holder), active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.active_consumers), - blocked_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.blocked_consumers)}, + State#q.active_consumers)}, case should_auto_delete(NewState) of false -> reply(ok, ensure_expiry_timer(NewState)); true -> {stop, normal, ok, NewState} @@ -1023,6 +1017,11 @@ handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS, active_consumers = ActiveConsumers} = drop_expired_messages(ensure_expiry_timer(State)), + %% TODO: According to the spec the returned consumer count should + %% not include blocked consumers. Since we remove those from + %% ActiveConsumers lazily - when come across them when attempting + %% to deliver messages - the count we are returning here may + %% contain some blocked consumers. reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, |
