summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-12-23 17:09:41 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-12-23 17:09:41 +0000
commit8e4a15d5ba40deeb1f138e657b342b48e35548bb (patch)
tree76a162df874033b68773ba7e3db6d6039b22884b
parent914f214e95f93f44063f3590fea636b873e8050b (diff)
downloadrabbitmq-server-git-8e4a15d5ba40deeb1f138e657b342b48e35548bb.tar.gz
extract all channel and consumer related queue functionality
design decisions: - all limiter interactions take place in the new module. The alternative would turn into spaghetti and leave more code in amqqueue_process. - asynchronous channel interactions - namely delivery of messages and 'drained' events - happens in the new module whereas all other operations - really just sending of replies - takes place in amqqueue_process. I'm still in two minds about that. It would actually be simpler to have the rabbit_channel:deliver code in amqqueue_process, albeit in an existing closure, but send_drained is rather deeply embedded. - we need a few functions in the API that manipulate the channel record. They all require the same kind of post-processing to invoke decorators, run the message queue, etc. I decided to make this pattern explicit in the API by having just one function there - queue_consumers:possibly_unblock - that takes a closure. It has just one call site - amqqueue_process:possibly_unblock - that contains the post-processing logic. The closures are constructed by four separate API functions. There are a few complications... 1) event emission and decorator notifications These are deeply embedded in some places and need to be lifted out so they are all in the one logical place where they belong, namely amqqueue_process. In some cases this complicates the API and code a fair bit since we, e.g., need to collate information for subsequent decorator notification. This is doubly annoying since the existing decorators don't actually make use of that information, so we should really consider simplifying the decorator API. 2) message delivery logic This is a fairly complex piece of code and it gets more complicated still because a) of the above, and b) sending a message to a channel requires the queue name, so we need to pass that in. We also need the state, and indeed modify it, though this is done opaquely. And it looks like we could just pass BQ & BQS instead. 3) handle_cast/credit I reckon this should have been implemented with possibly_unblock. It is now. The behaviour isn't obviously equivalent, so this needs careful consideration. The old code would invoke 'unblock' when "(not Drain or Len /= 0) and updated channel record is not blocked". By contrast, the possibly_unblock logic, which is what the new code invokes, invokes 'unblock' when the old channel record is blocked and the updated channel record is unblocked. This would seem to be logical, and it would be weird indeed if the credit code relied on any different behaviour. 4) queue_consumer:add is a nine-arg function. urgh. Mind you, so is the handle_call/basic_consume message. There are some minimal refactors and cosmetic changes which should go straight onto 'default' and thus reduce the distance to changes in this bug.
-rw-r--r--src/rabbit_amqqueue_process.erl531
-rw-r--r--src/rabbit_queue_consumers.erl339
2 files changed, 487 insertions, 383 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7002fd367c..d9b9e92ac5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,7 +20,6 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -38,7 +37,7 @@
has_had_consumers,
backing_queue,
backing_queue_state,
- active_consumers,
+ consumers,
consumer_use,
expires,
sync_timer_ref,
@@ -57,21 +56,6 @@
status
}).
--record(consumer, {tag, ack_required, args}).
-
-%% These are held in our process dictionary
--record(cr, {ch_pid,
- monitor_ref,
- acktags,
- consumer_count,
- %% Queue of {ChPid, #consumer{}} for consumers which have
- %% been blocked for any reason
- blocked_consumers,
- %% The limiter itself
- limiter,
- %% Internal flow control for queue -> writer
- unsent_message_count}).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -150,7 +134,7 @@ init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- active_consumers = priority_queue:new(),
+ consumers = rabbit_queue_consumers:new(),
consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
@@ -235,13 +219,14 @@ notify_decorators(Event, Props, State) when Event =:= startup;
Event =:= shutdown ->
decorator_callback(qname(State), Event, Props);
-notify_decorators(Event, Props, State = #q{active_consumers = ACs,
+notify_decorators(Event, Props, State = #q{consumers = Consumers,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- decorator_callback(
- qname(State), notify,
- [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)},
- {is_empty, BQ:is_empty(BQS)} | Props]]).
+ P = rabbit_queue_consumers:max_active_priority(Consumers),
+ decorator_callback(qname(State), notify,
+ [Event, [{max_active_consumer_priority, P},
+ {is_empty, BQ:is_empty(BQS)} |
+ Props]]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -315,7 +300,7 @@ init_max_length(MaxLen, State) ->
State1.
terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue_state = BQS} =
+ State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
fun stop_rate_timer/1,
@@ -326,8 +311,9 @@ terminate_shutdown(Fun, State) ->
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
notify_decorators(shutdown, [], State),
- [emit_consumer_deleted(Ch, CTag, QName)
- || {Ch, CTag, _} <- consumers(State1)],
+ [emit_consumer_deleted(Ch, CTag, QName) ||
+ {Ch, CTag, _, _} <-
+ rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -410,145 +396,46 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(State = #q{active_consumers = AC}) ->
- true = (priority_queue:is_empty(AC) orelse is_empty(State)).
+assert_invariant(State = #q{consumers = Consumers}) ->
+ true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
-lookup_ch(ChPid) ->
- case get({ch, ChPid}) of
- undefined -> not_found;
- C -> C
- end.
-
-ch_record(ChPid, LimiterPid) ->
- Key = {ch, ChPid},
- case get(Key) of
- undefined -> MonitorRef = erlang:monitor(process, ChPid),
- Limiter = rabbit_limiter:client(LimiterPid),
- C = #cr{ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = queue:new(),
- consumer_count = 0,
- blocked_consumers = priority_queue:new(),
- limiter = Limiter,
- unsent_message_count = 0},
- put(Key, C),
- C;
- C = #cr{} -> C
- end.
-
-update_ch_record(C = #cr{consumer_count = ConsumerCount,
- acktags = ChAckTags,
- unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {true, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
- end,
- C.
-
-store_ch_record(C = #cr{ch_pid = ChPid}) ->
- put({ch, ChPid}, C),
- ok.
-
-erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
- erlang:demonitor(MonitorRef),
- erase({ch, ChPid}),
- ok.
-
-all_ch_record() -> [C || {{ch, _}, C} <- get()].
-
-block_consumer(C = #cr{blocked_consumers = Blocked},
- {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
- update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}),
- notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
-
-is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
- Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
-
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
true -> notify_decorators(queue_empty, [], State),
- [send_drained(C) || C <- all_ch_record()];
+ rabbit_queue_consumers:send_drained();
false -> ok
end,
State.
-send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
- case rabbit_limiter:drained(Limiter) of
- {[], Limiter} -> ok;
- {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
- ChPid, CTagCredit),
- update_ch_record(C#cr{limiter = Limiter2})
+deliver_msgs_to_consumers(DeliverFun, Stop, State) ->
+ {Active, Blocked, State1, Consumers1} =
+ rabbit_queue_consumers:deliver(DeliverFun, Stop, qname(State), State,
+ State#q.consumers),
+ State2 = State1#q{consumers = Consumers1},
+ [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
+ {_ChPid, CTag} <- Blocked],
+ case Active of
+ true -> {true, State2};
+ false -> {false, update_consumer_use(State2, inactive)}
end.
-deliver_msgs_to_consumers(_DeliverFun, true, State) ->
- {true, State};
-deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers,
- consumer_use = CUInfo}) ->
- case priority_queue:out_p(ActiveConsumers) of
- {empty, _} ->
- {false,
- State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
- {{value, QEntry, Priority}, Tail} ->
- {Stop, State1} = deliver_msg_to_consumer(
- DeliverFun, QEntry, Priority,
- State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(DeliverFun, Stop, State1)
- end.
-
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) ->
- C = lookup_ch(ChPid),
- case is_ch_blocked(C) of
- true -> block_consumer(C, E, State),
- {false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter,
- Consumer#consumer.ack_required,
- Consumer#consumer.tag) of
- {suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E, State),
- {false, State};
- {continue, Limiter} ->
- AC1 = priority_queue:in(E, Priority,
- State#q.active_consumers),
- deliver_msg_to_consumer0(
- DeliverFun, Consumer, C#cr{limiter = Limiter},
- State#q{active_consumers = AC1})
- end
- end.
-
-deliver_msg_to_consumer0(DeliverFun,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, Stop, State1} =
- DeliverFun(AckRequired, State),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> queue:in(AckTag, ChAckTags);
- false -> ChAckTags
- end,
- update_ch_record(C#cr{acktags = ChAckTags1,
- unsent_message_count = Count + 1}),
- {Stop, State1}.
-
deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
-update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
+update_consumer_use(State = #q{consumer_use = CUInfo}, Use) ->
+ State#q{consumer_use = update_consumer_use1(CUInfo, Use)}.
+
+update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
-update_consumer_use({active, _, _} = CUInfo, active) ->
+update_consumer_use1({active, _, _} = CUInfo, active) ->
CUInfo;
-update_consumer_use({active, Since, Avg}, inactive) ->
+update_consumer_use1({active, Since, Avg}, inactive) ->
Now = now_micros(),
{inactive, Now, Now - Since, Avg};
-update_consumer_use({inactive, Since, Active, Avg}, active) ->
+update_consumer_use1({inactive, Since, Active, Avg}, active) ->
Now = now_micros(),
{active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
@@ -607,36 +494,29 @@ discard(#delivery{sender = SenderPid,
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- {_IsEmpty1, State1} = deliver_msgs_to_consumers(
- fun deliver_from_queue_deliver/2,
- is_empty(State), State),
+ {_Active, State1} = deliver_msgs_to_consumers(
+ fun deliver_from_queue_deliver/2,
+ is_empty(State), State),
State1.
-add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
- Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
- {_, P} -> P;
- _ -> 0
- end,
- priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers).
-
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- case BQ:is_duplicate(Message, BQS) of
- {false, BQS1} ->
- deliver_msgs_to_consumers(
- fun (true, State1 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}};
- (false, State1) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State1)}
- end, false, State#q{backing_queue_state = BQS1});
- {true, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}}
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ false -> deliver_msgs_to_consumers(
+ fun (true, State2 = #q{backing_queue_state = BQS2}) ->
+ true = BQ:is_empty(BQS2),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State2#q{backing_queue_state = BQS3}};
+ (false, State2) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State2)}
+ end, false, State1);
+ true -> {true, State1}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
@@ -652,7 +532,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
- maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
@@ -714,85 +594,45 @@ requeue(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
- end, Queue).
-
-remove_consumers(ChPid, Queue, QName) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag, QName),
- false;
- (_) ->
- true
- end, Queue).
-
-possibly_unblock(State, ChPid, Update) ->
- case lookup_ch(ChPid) of
- not_found -> State;
- C -> C1 = Update(C),
- case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
- false -> update_ch_record(C1),
- State;
- true -> unblock(State, C1)
- end
- end.
-
-unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
- case lists:partition(
- fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
- rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, priority_queue:to_list(C#cr.blocked_consumers)) of
- {_, []} ->
- update_ch_record(C),
+possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
+ case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
+ unchanged ->
State;
- {Blocked, Unblocked} ->
- BlockedQ = priority_queue:from_list(Blocked),
- UnblockedQ = priority_queue:from_list(Unblocked),
- update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- State1 = State#q{consumer_use =
- update_consumer_use(CUInfo, active)},
- AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
- State2 = State1#q{active_consumers = AC1},
- [notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State2) ||
- {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State2)
+ {unblocked, UnblockedCTags, Consumers1} ->
+ [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
+ State) || CTag <- UnblockedCTags],
+ run_message_queue(
+ update_consumer_use(State#q{consumers = Consumers1}, active))
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+handle_ch_down(DownPid, State = #q{consumers = Consumers,
+ exclusive_consumer = Holder,
senders = Senders}) ->
- Senders1 = case pmon:is_monitored(DownPid, Senders) of
- false -> Senders;
- true -> credit_flow:peer_down(DownPid),
- pmon:demonitor(DownPid, Senders)
- end,
- case lookup_ch(DownPid) of
+ State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end},
+ case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
not_found ->
- {ok, State#q{senders = Senders1}};
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- blocked_consumers = Blocked} ->
- QName = qname(State),
- _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
- ok = erase_ch_record(C),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State#q.active_consumers,
- QName),
- senders = Senders1},
- case should_auto_delete(State1) of
- true -> {stop, State1};
- false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
- ensure_expiry_timer(State1))}
+ {ok, State};
+ {ChAckTags, ChCTags, Consumers1} ->
+ QName = qname(State1),
+ [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags],
+ Holder1 = case Holder of
+ {DownPid, _} -> none;
+ Other -> Other
+ end,
+ State2 = State1#q{consumers = Consumers1,
+ exclusive_consumer = Holder1},
+ case should_auto_delete(State2) of
+ true -> {stop, State2};
+ false -> {ok, requeue_and_run(ChAckTags,
+ ensure_expiry_timer(State2))}
end
end.
@@ -806,10 +646,7 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() ->
- lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
-
-is_unused(_State) -> consumer_count() == 0.
+is_unused(_State) -> rabbit_queue_consumers:count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -821,23 +658,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
- case lookup_ch(ChPid) of
- not_found ->
- State;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- Fun(State)
- end.
-
-subtract_acks([], [], AckQ) ->
- AckQ;
-subtract_acks([], Prefix, AckQ) ->
- queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
- case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
+ not_found -> State;
+ ok -> Fun(State)
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
@@ -1058,14 +881,14 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
+ rabbit_queue_consumers:unacknowledged_message_count();
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
i(consumers, _) ->
- consumer_count();
+ rabbit_queue_consumers:count();
i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
- case consumer_count() of
+ case rabbit_queue_consumers:count() of
0 -> '';
_ -> case ConsumerUse of
{active, Since, Avg} ->
@@ -1098,17 +921,6 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers}) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
- consumers(ActiveConsumers, []), all_ch_record()).
-
-consumers(Consumers, Acc) ->
- priority_queue:fold(
- fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
- end, Acc, Consumers).
-
emit_stats(State) ->
emit_stats(State, []).
@@ -1197,8 +1009,8 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From, State) ->
- reply(consumers(State), State);
+handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
+ reply(rabbit_queue_consumers:all(Consumers), State);
handle_call({deliver, Delivery, Delivered}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
@@ -1223,89 +1035,58 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag}, State2} ->
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} =
- ch_record(ChPid, LimiterPid),
- ChAckTags1 = queue:in(AckTag, ChAckTags),
- update_ch_record(C#cr{acktags = ChAckTags1}),
- State2;
- false -> State2
- end,
+ {{Message, IsDelivered, AckTag},
+ #q{backing_queue = BQ, backing_queue_state = BQS} = State2} ->
+ case AckRequired of
+ true -> ok = rabbit_queue_consumers:record_ack(
+ ChPid, LimiterPid, AckTag);
+ false -> ok
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, BQ:len(BQS), Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
- _From, State = #q{exclusive_consumer = Holder}) ->
+ _From, State = #q{consumers = Consumers,
+ exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = Count,
- limiter = Limiter} = ch_record(ChPid, LimiterPid),
- Limiter1 = case LimiterActive of
- true -> rabbit_limiter:activate(Limiter);
- false -> Limiter
- end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, Drain)
- end,
- C1 = update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter2}),
- case is_empty(State) of
- true -> send_drained(C1);
- false -> ok
- end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck,
- args = OtherArgs},
- ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
- AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
- State2 = State1#q{active_consumers = AC1},
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State2),
- reply(ok, run_message_queue(State2))
+ in_use -> reply({error, exclusive_consume_unavailable}, State);
+ ok -> Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ CreditArgs, OtherArgs,
+ is_empty(State), Consumers),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ State1 = State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, qname(State1), OtherArgs),
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder}) ->
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
- case lookup_ch(ChPid) of
+ case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
- C = #cr{consumer_count = Count,
- limiter = Limiter,
- blocked_consumers = Blocked} ->
+ Consumers1 ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- Limiter1 = case Count of
- 1 -> rabbit_limiter:deactivate(Limiter);
- _ -> Limiter
- end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
- update_ch_record(C#cr{consumer_count = Count - 1,
- limiter = Limiter2,
- blocked_consumers = Blocked1}),
- State1 = State#q{
- exclusive_consumer = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers)},
+ Holder1 = case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end,
+ State1 = State#q{consumers = Consumers1,
+ exclusive_consumer = Holder1},
notify_decorators(
basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
@@ -1317,7 +1098,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_expiry_timer(State),
- reply({ok, BQ:len(BQS), consumer_count()}, State1);
+ reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1369,14 +1150,16 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State);
handle_call(force_event_refresh, _From,
- State = #q{exclusive_consumer = Exclusive}) ->
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
QName = qname(State),
+ AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
Ch, CTag, false, AckRequired, QName, Args) ||
- {Ch, CTag, AckRequired, Args} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State),
+ {Ch, CTag, AckRequired, Args} <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Args)
end,
@@ -1417,25 +1200,16 @@ handle_cast(delete_immediately, State) ->
stop(State);
handle_cast({resume, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = Limiter}) ->
- C#cr{limiter = rabbit_limiter:resume(Limiter)}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:resume_fun(),
+ ChPid, State));
handle_cast({notify_sent, ChPid, Credit}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - Credit}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:notify_sent_fun(Credit),
+ ChPid, State));
handle_cast({activate_limit, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = Limiter}) ->
- C#cr{limiter = rabbit_limiter:activate(Limiter)}
- end));
+ noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
+ ChPid, State));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
@@ -1451,39 +1225,30 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State);
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
true = BQ =/= rabbit_mirror_queue_master, %% assertion
BQ1 = rabbit_mirror_queue_master,
BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+ backing_queue_state = BQS1});
handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
BQ = rabbit_mirror_queue_master, %% assertion
{BQ1, BQS1} = BQ:stop_mirroring(BQS),
noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+ backing_queue_state = BQS1});
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
Len = BQ:len(BQS),
rabbit_channel:send_credit_reply(ChPid, Len),
- C = #cr{limiter = Limiter} = lookup_ch(ChPid),
- C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
- noreply(case Drain andalso Len == 0 of
- true -> update_ch_record(C1),
- send_drained(C1),
- State;
- false -> case is_ch_blocked(C1) of
- true -> update_ch_record(C1),
- State;
- false -> unblock(State, C1)
- end
- end);
+ noreply(possibly_unblock(rabbit_queue_consumers:credit_fun(
+ Len == 0, Credit, Drain, CTag),
+ ChPid, State));
handle_cast(notify_decorators, State) ->
notify_decorators(refresh, [], State),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
new file mode 100644
index 0000000000..549e0ffa39
--- /dev/null
+++ b/src/rabbit_queue_consumers.erl
@@ -0,0 +1,339 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_consumers).
+
+-export([new/0, max_active_priority/1, inactive/1,
+ unacknowledged_message_count/0, erase_ch/2, deliver/5,
+ add/9, remove/3, send_drained/0, record_ack/3, subtract_acks/2,
+ possibly_unblock/3,
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
+ count/0, all/1]).
+
+%%----------------------------------------------------------------------------
+
+-define(UNSENT_MESSAGE_LIMIT, 200).
+
+-record(consumer, {tag, ack_required, args}).
+
+%% These are held in our process dictionary
+-record(cr, {ch_pid,
+ monitor_ref,
+ acktags,
+ consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
+ blocked_consumers,
+ %% The limiter itself
+ limiter,
+ %% Internal flow control for queue -> writer
+ unsent_message_count}).
+
+%%----------------------------------------------------------------------------
+
+new() -> priority_queue:new().
+
+max_active_priority(Consumers) -> priority_queue:highest(Consumers).
+
+inactive(Consumers) -> priority_queue:is_empty(Consumers).
+
+unacknowledged_message_count() ->
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+
+erase_ch(ChPid, Consumers) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = BlockedQ} ->
+ AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ ok = erase_ch_record(C),
+ {queue:to_list(ChAckTags),
+ tags(priority_queue:to_list(AllConsumers)),
+ remove_consumers(ChPid, Consumers)}
+ end.
+
+add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
+ Drained, Consumers) ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, Crd, Drain)
+ end,
+ C1 = C#cr{consumer_count = Count + 1,
+ limiter = Limiter2},
+ update_ch_record(case Drained of
+ true -> send_drained(C1);
+ false -> C1
+ end),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck,
+ args = OtherArgs},
+ add_consumer({ChPid, Consumer}, Consumers).
+
+remove(ChPid, ConsumerTag, Consumers) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
+ Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
+ remove_consumer(ChPid, ConsumerTag, Consumers)
+ end.
+
+send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()].
+
+record_ack(ChPid, LimiterPid, AckTag) ->
+ C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
+ update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
+ ok.
+
+subtract_acks(ChPid, AckTags) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
+ ok
+ end.
+
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
+deliver(DeliverFun, Stop, QName, S, Consumers) ->
+ deliver(DeliverFun, Stop, QName, [], S, Consumers).
+
+deliver(_DeliverFun, true, _QName, Blocked, S, Consumers) ->
+ {true, Blocked, S, Consumers};
+deliver( DeliverFun, false, QName, Blocked, S, Consumers) ->
+ case priority_queue:out_p(Consumers) of
+ {empty, _} ->
+ {false, Blocked, S, Consumers};
+ {{value, QEntry, Priority}, Tail} ->
+ {Stop, Blocked1, S1, Consumers1} =
+ deliver1(DeliverFun, QEntry, Priority, QName, Blocked, S, Tail),
+ deliver(DeliverFun, Stop, QName, Blocked1, S1, Consumers1)
+ end.
+
+deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName,
+ Blocked, S, Consumers) ->
+ C = lookup_ch(ChPid),
+ case is_ch_blocked(C) of
+ true -> block_consumer(C, E),
+ Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
+ {false, Blocked1, S, Consumers};
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
+ {false, Blocked1, S, Consumers};
+ {continue, Limiter} ->
+ {Stop, S1} = deliver1(
+ DeliverFun, Consumer,
+ C#cr{limiter = Limiter}, QName, S),
+ {Stop, Blocked, S1,
+ priority_queue:in(E, Priority, Consumers)}
+ end
+ end.
+
+deliver1(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ QName, S) ->
+ {{Message, IsDelivered, AckTag}, Stop, S1} = DeliverFun(AckRequired, S),
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> queue:in(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ {Stop, S1}.
+
+possibly_unblock(Update, ChPid, Consumers) ->
+ case lookup_ch(ChPid) of
+ not_found -> unchanged;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ unchanged;
+ true -> unblock(C1, Consumers)
+ end
+ end.
+
+unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, Consumers) ->
+ case lists:partition(
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, priority_queue:to_list(BlockedQ)) of
+ {_, []} ->
+ update_ch_record(C),
+ unchanged;
+ {Blocked, Unblocked} ->
+ BlockedQ1 = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
+ {unblocked,
+ tags(Unblocked),
+ priority_queue:join(Consumers, UnblockedQ)}
+ end.
+
+resume_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end.
+
+notify_sent_fun(Credit) ->
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - Credit}
+ end.
+
+activate_limit_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end.
+
+credit_fun(IsEmpty, Credit, Drain, CTag) ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C1 = C#cr{limiter = rabbit_limiter:credit(
+ Limiter, CTag, Credit, Drain)},
+ case Drain andalso IsEmpty of
+ true -> send_drained(C1);
+ false -> C1
+ end
+ end.
+
+count() ->
+ lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
+all(Consumers) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(Consumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
+ priority_queue:fold(
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Args} | Acc1]
+ end, Acc, Consumers).
+
+%%----------------------------------------------------------------------------
+
+lookup_ch(ChPid) ->
+ case get({ch, ChPid}) of
+ undefined -> not_found;
+ C -> C
+ end.
+
+ch_record(ChPid, LimiterPid) ->
+ Key = {ch, ChPid},
+ case get(Key) of
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
+ C = #cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = queue:new(),
+ consumer_count = 0,
+ blocked_consumers = priority_queue:new(),
+ limiter = Limiter,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
+ C = #cr{} -> C
+ end.
+
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
+ end,
+ C.
+
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
+
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}),
+ ok.
+
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
+
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
+
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> C;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ C#cr{limiter = Limiter2}
+ end.
+
+tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
+
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, Queue).
+
+remove_consumer(ChPid, ConsumerTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
+ end, Queue).
+
+remove_consumers(ChPid, Queue) ->
+ priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
+ (_) -> true
+ end, Queue).