summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-18 12:30:11 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-18 12:30:11 +0100
commit4ff1f35bb701a45443d14279bf8772382046854e (patch)
treeb0e2e729886d538c27d7f2c41257a2568cdfdac0 /src
parent7678f8aeb43c9a3d08229efaa217027548296a13 (diff)
downloadrabbitmq-server-git-4ff1f35bb701a45443d14279bf8772382046854e.tar.gz
improved compiling- and working-ness
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl67
1 files changed, 33 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e4b48c4706..44b0ae77ff 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -580,27 +580,18 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
-add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-
remove_consumer(ChPid, ConsumerTag, Queue) ->
- queue:filter(fun ({CP, #consumer{tag = CT}}) ->
- (CP /= ChPid) or (CT /= ConsumerTag)
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
- {Kept, Removed} = split_by_channel(ChPid, Queue),
- [emit_consumer_deleted(Ch, CTag) ||
- {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
- Kept.
-
-move_consumers(ChPid, From, To) ->
- {Kept, Removed} = split_by_channel(ChPid, From),
- {Kept, queue:join(To, Removed)}.
-
-split_by_channel(ChPid, Queue) ->
- {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue)),
- {queue:from_list(Kept), queue:from_list(Removed)}.
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -628,7 +619,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- C = #cr{ch_pid = ChPid, acktags = ChAckTags} ->
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = Blocked} ->
+ _ = remove_consumers(ChPid, Blocked), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -636,9 +630,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)},
+ ChPid, State#q.active_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -661,8 +653,10 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
- queue:is_empty(State#q.blocked_consumers).
+consumer_count() ->
+ lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
+is_unused(_State) -> consumer_count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -789,8 +783,8 @@ i(messages_unacknowledged, _) ->
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
-i(consumers, State) ->
- queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
+i(consumers, _) ->
+ consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -806,13 +800,14 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+consumers(#q{active_consumers = ActiveConsumers}) ->
rabbit_misc:queue_fold(
fun ({ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}, Acc) ->
[{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ end, [], lists:foldl(fun (#cr{blocked_consumers = Consumers}, Acc) ->
+ queue:join(Acc, Consumers)
+ end, ActiveConsumers, all_ch_record())).
emit_stats(State) ->
emit_stats(State, []).
@@ -999,20 +994,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C ->
- update_consumer_count(C, -1),
+ C = #cr{blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
+ update_consumer_count(C#cr{blocked_consumers =
+ remove_consumer(ChPid, ConsumerTag,
+ Blocked)}, -1),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
ConsumerTag,
Holder),
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers),
- blocked_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.blocked_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(NewState) of
false -> reply(ok, ensure_expiry_timer(NewState));
true -> {stop, normal, ok, NewState}
@@ -1023,6 +1017,11 @@ handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
active_consumers = ActiveConsumers} =
drop_expired_messages(ensure_expiry_timer(State)),
+ %% TODO: According to the spec the returned consumer count should
+ %% not include blocked consumers. Since we remove those from
+ %% ActiveConsumers lazily - when come across them when attempting
+ %% to deliver messages - the count we are returning here may
+ %% contain some blocked consumers.
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,