diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-23 17:09:41 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-12-23 17:09:41 +0000 |
| commit | 8e4a15d5ba40deeb1f138e657b342b48e35548bb (patch) | |
| tree | 76a162df874033b68773ba7e3db6d6039b22884b | |
| parent | 914f214e95f93f44063f3590fea636b873e8050b (diff) | |
| download | rabbitmq-server-git-8e4a15d5ba40deeb1f138e657b342b48e35548bb.tar.gz | |
extract all channel and consumer related queue functionality
design decisions:
- all limiter interactions take place in the new module. The
alternative would turn into spaghetti and leave more code in
amqqueue_process.
- asynchronous channel interactions - namely delivery of messages and
'drained' events - happens in the new module whereas all other
operations - really just sending of replies - takes place in
amqqueue_process. I'm still in two minds about that. It would
actually be simpler to have the rabbit_channel:deliver code in
amqqueue_process, albeit in an existing closure, but send_drained is
rather deeply embedded.
- we need a few functions in the API that manipulate the channel
record. They all require the same kind of post-processing to invoke
decorators, run the message queue, etc. I decided to make this
pattern explicit in the API by having just one function there -
queue_consumers:possibly_unblock - that takes a closure. It has just
one call site - amqqueue_process:possibly_unblock - that contains
the post-processing logic. The closures are constructed by four
separate API functions.
There are a few complications...
1) event emission and decorator notifications
These are deeply embedded in some places and need to be lifted out so
they are all in the one logical place where they belong, namely
amqqueue_process. In some cases this complicates the API and code a
fair bit since we, e.g., need to collate information for subsequent
decorator notification. This is doubly annoying since the existing
decorators don't actually make use of that information, so we should
really consider simplifying the decorator API.
2) message delivery logic
This is a fairly complex piece of code and it gets more complicated
still because a) of the above, and b) sending a message to a channel
requires the queue name, so we need to pass that in. We also need the
state, and indeed modify it, though this is done opaquely. And it
looks like we could just pass BQ & BQS instead.
3) handle_cast/credit
I reckon this should have been implemented with possibly_unblock. It
is now. The behaviour isn't obviously equivalent, so this needs
careful consideration. The old code would invoke 'unblock' when "(not
Drain or Len /= 0) and updated channel record is not blocked". By
contrast, the possibly_unblock logic, which is what the new code
invokes, invokes 'unblock' when the old channel record is blocked and
the updated channel record is unblocked. This would seem to be
logical, and it would be weird indeed if the credit code relied on any
different behaviour.
4) queue_consumer:add is a nine-arg function. urgh. Mind you, so is
the handle_call/basic_consume message.
There are some minimal refactors and cosmetic changes which should go
straight onto 'default' and thus reduce the distance to changes in
this bug.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 531 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 339 |
2 files changed, 487 insertions, 383 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7002fd367c..d9b9e92ac5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,7 +20,6 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -38,7 +37,7 @@ has_had_consumers, backing_queue, backing_queue_state, - active_consumers, + consumers, consumer_use, expires, sync_timer_ref, @@ -57,21 +56,6 @@ status }). --record(consumer, {tag, ack_required, args}). - -%% These are held in our process dictionary --record(cr, {ch_pid, - monitor_ref, - acktags, - consumer_count, - %% Queue of {ChPid, #consumer{}} for consumers which have - %% been blocked for any reason - blocked_consumers, - %% The limiter itself - limiter, - %% Internal flow control for queue -> writer - unsent_message_count}). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -150,7 +134,7 @@ init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, has_had_consumers = false, - active_consumers = priority_queue:new(), + consumers = rabbit_queue_consumers:new(), consumer_use = {inactive, now_micros(), 0, 0.0}, senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), @@ -235,13 +219,14 @@ notify_decorators(Event, Props, State) when Event =:= startup; Event =:= shutdown -> decorator_callback(qname(State), Event, Props); -notify_decorators(Event, Props, State = #q{active_consumers = ACs, +notify_decorators(Event, Props, State = #q{consumers = Consumers, backing_queue = BQ, backing_queue_state = BQS}) -> - decorator_callback( - qname(State), notify, - [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)}, - {is_empty, BQ:is_empty(BQS)} | Props]]). + P = rabbit_queue_consumers:max_active_priority(Consumers), + decorator_callback(qname(State), notify, + [Event, [{max_active_consumer_priority, P}, + {is_empty, BQ:is_empty(BQS)} | + Props]]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -315,7 +300,7 @@ init_max_length(MaxLen, State) -> State1. terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue_state = BQS} = + State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, [fun stop_sync_timer/1, fun stop_rate_timer/1, @@ -326,8 +311,9 @@ terminate_shutdown(Fun, State) -> _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), notify_decorators(shutdown, [], State), - [emit_consumer_deleted(Ch, CTag, QName) - || {Ch, CTag, _} <- consumers(State1)], + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _} <- + rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -410,145 +396,46 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(State = #q{active_consumers = AC}) -> - true = (priority_queue:is_empty(AC) orelse is_empty(State)). +assert_invariant(State = #q{consumers = Consumers}) -> + true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). -lookup_ch(ChPid) -> - case get({ch, ChPid}) of - undefined -> not_found; - C -> C - end. - -ch_record(ChPid, LimiterPid) -> - Key = {ch, ChPid}, - case get(Key) of - undefined -> MonitorRef = erlang:monitor(process, ChPid), - Limiter = rabbit_limiter:client(LimiterPid), - C = #cr{ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = queue:new(), - consumer_count = 0, - blocked_consumers = priority_queue:new(), - limiter = Limiter, - unsent_message_count = 0}, - put(Key, C), - C; - C = #cr{} -> C - end. - -update_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> - case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of - {true, 0, 0} -> ok = erase_ch_record(C); - _ -> ok = store_ch_record(C) - end, - C. - -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C), - ok. - -erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}), - ok. - -all_ch_record() -> [C || {{ch, _}, C} <- get()]. - -block_consumer(C = #cr{blocked_consumers = Blocked}, - {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> - update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}), - notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). - -is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> - Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). - maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of true -> notify_decorators(queue_empty, [], State), - [send_drained(C) || C <- all_ch_record()]; + rabbit_queue_consumers:send_drained(); false -> ok end, State. -send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> - case rabbit_limiter:drained(Limiter) of - {[], Limiter} -> ok; - {CTagCredit, Limiter2} -> rabbit_channel:send_drained( - ChPid, CTagCredit), - update_ch_record(C#cr{limiter = Limiter2}) +deliver_msgs_to_consumers(DeliverFun, Stop, State) -> + {Active, Blocked, State1, Consumers1} = + rabbit_queue_consumers:deliver(DeliverFun, Stop, qname(State), State, + State#q.consumers), + State2 = State1#q{consumers = Consumers1}, + [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) || + {_ChPid, CTag} <- Blocked], + case Active of + true -> {true, State2}; + false -> {false, update_consumer_use(State2, inactive)} end. -deliver_msgs_to_consumers(_DeliverFun, true, State) -> - {true, State}; -deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers, - consumer_use = CUInfo}) -> - case priority_queue:out_p(ActiveConsumers) of - {empty, _} -> - {false, - State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; - {{value, QEntry, Priority}, Tail} -> - {Stop, State1} = deliver_msg_to_consumer( - DeliverFun, QEntry, Priority, - State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(DeliverFun, Stop, State1) - end. - -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) -> - C = lookup_ch(ChPid), - case is_ch_blocked(C) of - true -> block_consumer(C, E, State), - {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, - Consumer#consumer.ack_required, - Consumer#consumer.tag) of - {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E, State), - {false, State}; - {continue, Limiter} -> - AC1 = priority_queue:in(E, Priority, - State#q.active_consumers), - deliver_msg_to_consumer0( - DeliverFun, Consumer, C#cr{limiter = Limiter}, - State#q{active_consumers = AC1}) - end - end. - -deliver_msg_to_consumer0(DeliverFun, - #consumer{tag = ConsumerTag, - ack_required = AckRequired}, - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - unsent_message_count = Count}, - State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, Stop, State1} = - DeliverFun(AckRequired, State), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, - {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> queue:in(AckTag, ChAckTags); - false -> ChAckTags - end, - update_ch_record(C#cr{acktags = ChAckTags1, - unsent_message_count = Count + 1}), - {Stop, State1}. - deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), {Result, is_empty(State1), State1}. -update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> +update_consumer_use(State = #q{consumer_use = CUInfo}, Use) -> + State#q{consumer_use = update_consumer_use1(CUInfo, Use)}. + +update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; -update_consumer_use({active, _, _} = CUInfo, active) -> +update_consumer_use1({active, _, _} = CUInfo, active) -> CUInfo; -update_consumer_use({active, Since, Avg}, inactive) -> +update_consumer_use1({active, Since, Avg}, inactive) -> Now = now_micros(), {inactive, Now, Now - Since, Avg}; -update_consumer_use({inactive, Since, Active, Avg}, active) -> +update_consumer_use1({inactive, Since, Active, Avg}, active) -> Now = now_micros(), {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. @@ -607,36 +494,29 @@ discard(#delivery{sender = SenderPid, State1#q{backing_queue_state = BQS1}. run_message_queue(State) -> - {_IsEmpty1, State1} = deliver_msgs_to_consumers( - fun deliver_from_queue_deliver/2, - is_empty(State), State), + {_Active, State1} = deliver_msgs_to_consumers( + fun deliver_from_queue_deliver/2, + is_empty(State), State), State1. -add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> - Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of - {_, P} -> P; - _ -> 0 - end, - priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers). - attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:is_duplicate(Message, BQS) of - {false, BQS1} -> - deliver_msgs_to_consumers( - fun (true, State1 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}}; - (false, State1) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State1)} - end, false, State#q{backing_queue_state = BQS1}); - {true, BQS1} -> - {true, State#q{backing_queue_state = BQS1}} + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + false -> deliver_msgs_to_consumers( + fun (true, State2 = #q{backing_queue_state = BQS2}) -> + true = BQ:is_empty(BQS2), + {AckTag, BQS3} = BQ:publish_delivered( + Message, Props, SenderPid, BQS2), + {{Message, Delivered, AckTag}, + true, State2#q{backing_queue_state = BQS3}}; + (false, State2) -> + {{Message, Delivered, undefined}, + true, discard(Delivery, State2)} + end, false, State1); + true -> {true, State1} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, @@ -652,7 +532,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), {Dropped, State3 = #q{backing_queue_state = BQS2}} = - maybe_drop_head(State2#q{backing_queue_state = BQS1}), + maybe_drop_head(State2#q{backing_queue_state = BQS1}), QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so @@ -714,85 +594,45 @@ requeue(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) - end, Queue). - -remove_consumers(ChPid, Queue, QName) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag, QName), - false; - (_) -> - true - end, Queue). - -possibly_unblock(State, ChPid, Update) -> - case lookup_ch(ChPid) of - not_found -> State; - C -> C1 = Update(C), - case is_ch_blocked(C) andalso not is_ch_blocked(C1) of - false -> update_ch_record(C1), - State; - true -> unblock(State, C1) - end - end. - -unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> - case lists:partition( - fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> - rabbit_limiter:is_consumer_blocked(Limiter, CTag) - end, priority_queue:to_list(C#cr.blocked_consumers)) of - {_, []} -> - update_ch_record(C), +possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> + case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of + unchanged -> State; - {Blocked, Unblocked} -> - BlockedQ = priority_queue:from_list(Blocked), - UnblockedQ = priority_queue:from_list(Unblocked), - update_ch_record(C#cr{blocked_consumers = BlockedQ}), - State1 = State#q{consumer_use = - update_consumer_use(CUInfo, active)}, - AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), - State2 = State1#q{active_consumers = AC1}, - [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State2) || - {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State2) + {unblocked, UnblockedCTags, Consumers1} -> + [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}], + State) || CTag <- UnblockedCTags], + run_message_queue( + update_consumer_use(State#q{consumers = Consumers1}, active)) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, +handle_ch_down(DownPid, State = #q{consumers = Consumers, + exclusive_consumer = Holder, senders = Senders}) -> - Senders1 = case pmon:is_monitored(DownPid, Senders) of - false -> Senders; - true -> credit_flow:peer_down(DownPid), - pmon:demonitor(DownPid, Senders) - end, - case lookup_ch(DownPid) of + State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end}, + case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of not_found -> - {ok, State#q{senders = Senders1}}; - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - blocked_consumers = Blocked} -> - QName = qname(State), - _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission - ok = erase_ch_record(C), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State#q.active_consumers, - QName), - senders = Senders1}, - case should_auto_delete(State1) of - true -> {stop, State1}; - false -> {ok, requeue_and_run(queue:to_list(ChAckTags), - ensure_expiry_timer(State1))} + {ok, State}; + {ChAckTags, ChCTags, Consumers1} -> + QName = qname(State1), + [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags], + Holder1 = case Holder of + {DownPid, _} -> none; + Other -> Other + end, + State2 = State1#q{consumers = Consumers1, + exclusive_consumer = Holder1}, + case should_auto_delete(State2) of + true -> {stop, State2}; + false -> {ok, requeue_and_run(ChAckTags, + ensure_expiry_timer(State2))} end end. @@ -806,10 +646,7 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -consumer_count() -> - lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). - -is_unused(_State) -> consumer_count() == 0. +is_unused(_State) -> rabbit_queue_consumers:count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -821,23 +658,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ, State#q{backing_queue_state = BQ:timeout(BQS)}. subtract_acks(ChPid, AckTags, State, Fun) -> - case lookup_ch(ChPid) of - not_found -> - State; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - Fun(State) - end. - -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> - case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of + not_found -> State; + ok -> Fun(State) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -1058,14 +881,14 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]); + rabbit_queue_consumers:unacknowledged_message_count(); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); i(consumers, _) -> - consumer_count(); + rabbit_queue_consumers:count(); i(consumer_utilisation, #q{consumer_use = ConsumerUse}) -> - case consumer_count() of + case rabbit_queue_consumers:count() of 0 -> ''; _ -> case ConsumerUse of {active, Since, Avg} -> @@ -1098,17 +921,6 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -consumers(#q{active_consumers = ActiveConsumers}) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, - consumers(ActiveConsumers, []), all_ch_record()). - -consumers(Consumers, Acc) -> - priority_queue:fold( - fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] - end, Acc, Consumers). - emit_stats(State) -> emit_stats(State, []). @@ -1197,8 +1009,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, State) -> - reply(consumers(State), State); +handle_call(consumers, _From, State = #q{consumers = Consumers}) -> + reply(rabbit_queue_consumers:all(Consumers), State); handle_call({deliver, Delivery, Delivered}, From, State) -> %% Synchronous, "mandatory" deliver mode. @@ -1223,89 +1035,58 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag}, State2} -> - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = - ch_record(ChPid, LimiterPid), - ChAckTags1 = queue:in(AckTag, ChAckTags), - update_ch_record(C#cr{acktags = ChAckTags1}), - State2; - false -> State2 - end, + {{Message, IsDelivered, AckTag}, + #q{backing_queue = BQ, backing_queue_state = BQS} = State2} -> + case AckRequired of + true -> ok = rabbit_queue_consumers:record_ack( + ChPid, LimiterPid, AckTag); + false -> ok + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, BQ:len(BQS), Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, - _From, State = #q{exclusive_consumer = Holder}) -> + _From, State = #q{consumers = Consumers, + exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = Count, - limiter = Limiter} = ch_record(ChPid, LimiterPid), - Limiter1 = case LimiterActive of - true -> rabbit_limiter:activate(Limiter); - false -> Limiter - end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, Drain) - end, - C1 = update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter2}), - case is_empty(State) of - true -> send_drained(C1); - false -> ok - end, - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck, - args = OtherArgs}, - ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), OtherArgs), - AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), - State2 = State1#q{active_consumers = AC1}, - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State2), - reply(ok, run_message_queue(State2)) + in_use -> reply({error, exclusive_consume_unavailable}, State); + ok -> Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + CreditArgs, OtherArgs, + is_empty(State), Consumers), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + State1 = State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, qname(State1), OtherArgs), + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State1), + reply(ok, run_message_queue(State1)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), - case lookup_ch(ChPid) of + case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); - C = #cr{consumer_count = Count, - limiter = Limiter, - blocked_consumers = Blocked} -> + Consumers1 -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - Limiter1 = case Count of - 1 -> rabbit_limiter:deactivate(Limiter); - _ -> Limiter - end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), - update_ch_record(C#cr{consumer_count = Count - 1, - limiter = Limiter2, - blocked_consumers = Blocked1}), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers)}, + Holder1 = case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end, + State1 = State#q{consumers = Consumers1, + exclusive_consumer = Holder1}, notify_decorators( basic_cancel, [{consumer_tag, ConsumerTag}], State1), case should_auto_delete(State1) of @@ -1317,7 +1098,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_expiry_timer(State), - reply({ok, BQ:len(BQS), consumer_count()}, State1); + reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1369,14 +1150,16 @@ handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State); handle_call(force_event_refresh, _From, - State = #q{exclusive_consumer = Exclusive}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), QName = qname(State), + AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( Ch, CTag, false, AckRequired, QName, Args) || - {Ch, CTag, AckRequired, Args} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State), + {Ch, CTag, AckRequired, Args} <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, emit_consumer_created( Ch, CTag, true, AckRequired, QName, Args) end, @@ -1417,25 +1200,16 @@ handle_cast(delete_immediately, State) -> stop(State); handle_cast({resume, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{limiter = Limiter}) -> - C#cr{limiter = rabbit_limiter:resume(Limiter)} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:resume_fun(), + ChPid, State)); handle_cast({notify_sent, ChPid, Credit}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - Credit} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:notify_sent_fun(Credit), + ChPid, State)); handle_cast({activate_limit, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{limiter = Limiter}) -> - C#cr{limiter = rabbit_limiter:activate(Limiter)} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), + ChPid, State)); handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), @@ -1451,39 +1225,30 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State); handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> %% lookup again to get policy for init_with_existing_bq {ok, Q} = rabbit_amqqueue:lookup(qname(State)), true = BQ =/= rabbit_mirror_queue_master, %% assertion BQ1 = rabbit_mirror_queue_master, BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); + backing_queue_state = BQS1}); handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> BQ = rabbit_mirror_queue_master, %% assertion {BQ1, BQS1} = BQ:stop_mirroring(BQS), noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); + backing_queue_state = BQS1}); handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Len = BQ:len(BQS), rabbit_channel:send_credit_reply(ChPid, Len), - C = #cr{limiter = Limiter} = lookup_ch(ChPid), - C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, - noreply(case Drain andalso Len == 0 of - true -> update_ch_record(C1), - send_drained(C1), - State; - false -> case is_ch_blocked(C1) of - true -> update_ch_record(C1), - State; - false -> unblock(State, C1) - end - end); + noreply(possibly_unblock(rabbit_queue_consumers:credit_fun( + Len == 0, Credit, Drain, CTag), + ChPid, State)); handle_cast(notify_decorators, State) -> notify_decorators(refresh, [], State), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl new file mode 100644 index 0000000000..549e0ffa39 --- /dev/null +++ b/src/rabbit_queue_consumers.erl @@ -0,0 +1,339 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_queue_consumers). + +-export([new/0, max_active_priority/1, inactive/1, + unacknowledged_message_count/0, erase_ch/2, deliver/5, + add/9, remove/3, send_drained/0, record_ack/3, subtract_acks/2, + possibly_unblock/3, + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, + count/0, all/1]). + +%%---------------------------------------------------------------------------- + +-define(UNSENT_MESSAGE_LIMIT, 200). + +-record(consumer, {tag, ack_required, args}). + +%% These are held in our process dictionary +-record(cr, {ch_pid, + monitor_ref, + acktags, + consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason + blocked_consumers, + %% The limiter itself + limiter, + %% Internal flow control for queue -> writer + unsent_message_count}). + +%%---------------------------------------------------------------------------- + +new() -> priority_queue:new(). + +max_active_priority(Consumers) -> priority_queue:highest(Consumers). + +inactive(Consumers) -> priority_queue:is_empty(Consumers). + +unacknowledged_message_count() -> + lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). + +erase_ch(ChPid, Consumers) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = BlockedQ} -> + AllConsumers = priority_queue:join(Consumers, BlockedQ), + ok = erase_ch_record(C), + {queue:to_list(ChAckTags), + tags(priority_queue:to_list(AllConsumers)), + remove_consumers(ChPid, Consumers)} + end. + +add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, + Drained, Consumers) -> + C = #cr{consumer_count = Count, + limiter = Limiter} = ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, Drain) + end, + C1 = C#cr{consumer_count = Count + 1, + limiter = Limiter2}, + update_ch_record(case Drained of + true -> send_drained(C1); + false -> C1 + end), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck, + args = OtherArgs}, + add_consumer({ChPid, Consumer}, Consumers). + +remove(ChPid, ConsumerTag, Consumers) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> + Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + Limiter1 = case Count of + 1 -> rabbit_limiter:deactivate(Limiter); + _ -> Limiter + end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter2, + blocked_consumers = Blocked1}), + remove_consumer(ChPid, ConsumerTag, Consumers) + end. + +send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()]. + +record_ack(ChPid, LimiterPid, AckTag) -> + C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), + update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + ok. + +subtract_acks(ChPid, AckTags) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{acktags = ChAckTags} -> + update_ch_record( + C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), + ok + end. + +subtract_acks([], [], AckQ) -> + AckQ; +subtract_acks([], Prefix, AckQ) -> + queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); +subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> + case queue:out(AckQ) of + {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); + {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + end. + +deliver(DeliverFun, Stop, QName, S, Consumers) -> + deliver(DeliverFun, Stop, QName, [], S, Consumers). + +deliver(_DeliverFun, true, _QName, Blocked, S, Consumers) -> + {true, Blocked, S, Consumers}; +deliver( DeliverFun, false, QName, Blocked, S, Consumers) -> + case priority_queue:out_p(Consumers) of + {empty, _} -> + {false, Blocked, S, Consumers}; + {{value, QEntry, Priority}, Tail} -> + {Stop, Blocked1, S1, Consumers1} = + deliver1(DeliverFun, QEntry, Priority, QName, Blocked, S, Tail), + deliver(DeliverFun, Stop, QName, Blocked1, S1, Consumers1) + end. + +deliver1(DeliverFun, E = {ChPid, Consumer}, Priority, QName, + Blocked, S, Consumers) -> + C = lookup_ch(ChPid), + case is_ch_blocked(C) of + true -> block_consumer(C, E), + Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], + {false, Blocked1, S, Consumers}; + false -> case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], + {false, Blocked1, S, Consumers}; + {continue, Limiter} -> + {Stop, S1} = deliver1( + DeliverFun, Consumer, + C#cr{limiter = Limiter}, QName, S), + {Stop, Blocked, S1, + priority_queue:in(E, Priority, Consumers)} + end + end. + +deliver1(DeliverFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + QName, S) -> + {{Message, IsDelivered, AckTag}, Stop, S1} = DeliverFun(AckRequired, S), + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> queue:in(AckTag, ChAckTags); + false -> ChAckTags + end, + update_ch_record(C#cr{acktags = ChAckTags1, + unsent_message_count = Count + 1}), + {Stop, S1}. + +possibly_unblock(Update, ChPid, Consumers) -> + case lookup_ch(ChPid) of + not_found -> unchanged; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + unchanged; + true -> unblock(C1, Consumers) + end + end. + +unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, Consumers) -> + case lists:partition( + fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, priority_queue:to_list(BlockedQ)) of + {_, []} -> + update_ch_record(C), + unchanged; + {Blocked, Unblocked} -> + BlockedQ1 = priority_queue:from_list(Blocked), + UnblockedQ = priority_queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ1}), + {unblocked, + tags(Unblocked), + priority_queue:join(Consumers, UnblockedQ)} + end. + +resume_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end. + +notify_sent_fun(Credit) -> + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - Credit} + end. + +activate_limit_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end. + +credit_fun(IsEmpty, Credit, Drain, CTag) -> + fun (C = #cr{limiter = Limiter}) -> + C1 = C#cr{limiter = rabbit_limiter:credit( + Limiter, CTag, Credit, Drain)}, + case Drain andalso IsEmpty of + true -> send_drained(C1); + false -> C1 + end + end. + +count() -> + lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). + +all(Consumers) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, + consumers(Consumers, []), all_ch_record()). + +consumers(Consumers, Acc) -> + priority_queue:fold( + fun ({ChPid, Consumer}, _P, Acc1) -> + #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, + [{ChPid, CTag, Ack, Args} | Acc1] + end, Acc, Consumers). + +%%---------------------------------------------------------------------------- + +lookup_ch(ChPid) -> + case get({ch, ChPid}) of + undefined -> not_found; + C -> C + end. + +ch_record(ChPid, LimiterPid) -> + Key = {ch, ChPid}, + case get(Key) of + undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), + C = #cr{ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = queue:new(), + consumer_count = 0, + blocked_consumers = priority_queue:new(), + limiter = Limiter, + unsent_message_count = 0}, + put(Key, C), + C; + C = #cr{} -> C + end. + +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> + case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + {true, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) + end, + C. + +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. + +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + ok. + +all_ch_record() -> [C || {{ch, _}, C} <- get()]. + +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> + update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}). + +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> C; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + C#cr{limiter = Limiter2} + end. + +tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList]. + +add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> + Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_, P} -> P; + _ -> 0 + end, + priority_queue:in({ChPid, Consumer}, Priority, Queue). + +remove_consumer(ChPid, ConsumerTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) + end, Queue). + +remove_consumers(ChPid, Queue) -> + priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false; + (_) -> true + end, Queue). |
