summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_queue_consumers.erl31
-rw-r--r--src/rabbit_queue_decorator.erl11
3 files changed, 38 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 281aecb93c..7597ec9d84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
- notify_decorators(startup, [], State3),
+ notify_decorators(startup, State3),
State3.
init_state(Q) ->
@@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q,
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- notify_decorators(startup, [], State),
+ notify_decorators(startup, State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -213,18 +213,16 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, Props, State) when Event =:= startup;
- Event =:= shutdown ->
- decorator_callback(qname(State), Event, Props);
+notify_decorators(Event, State) ->
+ decorator_callback(qname(State), Event, []).
-notify_decorators(Event, Props, State = #q{consumers = Consumers,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+notify_decorators(State = #q{consumers = Consumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
P = rabbit_queue_consumers:max_active_priority(Consumers),
- decorator_callback(qname(State), notify,
- [Event, [{max_active_consumer_priority, P},
- {is_empty, BQ:is_empty(BQS)} |
- Props]]).
+ decorator_callback(qname(State), active_consumers_changed,
+ [[{max_active_consumer_priority, P},
+ {is_empty, BQ:is_empty(BQS)}]]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -308,7 +306,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- notify_decorators(shutdown, [], State),
+ notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
{Ch, CTag, _, _} <-
rabbit_queue_consumers:all(Consumers)],
@@ -401,7 +399,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> notify_decorators(queue_empty, [], State),
+ true -> notify_decorators(State),
rabbit_queue_consumers:send_drained();
false -> ok
end,
@@ -412,8 +410,10 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
Consumers),
State2 = State1#q{consumers = Consumers1},
- [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
- {_ChPid, CTag} <- Blocked],
+ case Blocked of
+ true -> notify_decorators(State2);
+ false -> ok
+ end,
{Active, State2}.
confirm_messages([], State) ->
@@ -568,10 +568,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
unchanged ->
State;
- {unblocked, UnblockedCTags, Consumers1} ->
+ {unblocked, Consumers1} ->
State1 = State#q{consumers = Consumers1},
- [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
- State1) || CTag <- UnblockedCTags],
+ notify_decorators(State1),
run_message_queue(State1)
end.
@@ -599,8 +598,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- ChCTags],
+ notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
@@ -1034,8 +1032,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1), OtherArgs),
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1054,8 +1051,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
- notify_decorators(
- basic_cancel, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1218,7 +1214,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
ChPid, State));
handle_cast(notify_decorators, State) ->
- notify_decorators(refresh, [], State),
+ notify_decorators(State),
noreply(State);
handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 702091dca9..19b68cac66 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -82,8 +82,7 @@
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' |
- {'unblocked', [rabbit_types:ctag()], state()}.
+ 'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -182,42 +181,41 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
deliver(FetchFun, Stop, QName, S, State) ->
- deliver(FetchFun, Stop, QName, [], S, State).
+ deliver(FetchFun, Stop, QName, false, S, State).
-deliver(_FetchFun, true, _QName, Blocked, S, State) ->
- {true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S,
+deliver(_FetchFun, true, _QName, NewlyBlocked, S, State) ->
+ {true, NewlyBlocked, S, State};
+deliver( FetchFun, false, QName, NewlyBlocked, S,
State = #state{consumers = Consumers, use = Use}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
+ Use1 = update_use(Use, inactive),
+ {false, NewlyBlocked, S, State#state{use = Use1}};
{{value, QEntry, Priority}, Tail} ->
- {Stop, Blocked1, S1, Consumers1} =
+ {Stop, NewlyBlocked1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
- Blocked, S, Tail),
- deliver(FetchFun, Stop, QName, Blocked1, S1,
+ NewlyBlocked, S, Tail),
+ deliver(FetchFun, Stop, QName, NewlyBlocked1, S1,
State#state{consumers = Consumers1})
end.
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
- Blocked, S, Consumers) ->
+ NewlyBlocked, S, Consumers) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ {false, true, S, Consumers};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ {false, true, S, Consumers};
{continue, Limiter} ->
{Stop, S1} = deliver_to_consumer(
FetchFun, Consumer,
C#cr{limiter = Limiter}, QName, S),
- {Stop, Blocked, S1,
+ {Stop, NewlyBlocked, S1,
priority_queue:in(E, Priority, Consumers)}
end
end.
@@ -290,7 +288,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
- tags(Unblocked),
State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
use = update_use(Use, active)}}
end.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 8f6375a504..b3c02403be 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,13 +8,6 @@
-ifdef(use_specs).
--type(notify_event() :: 'consumer_blocked' |
- 'consumer_unblocked' |
- 'queue_empty' |
- 'basic_consume' |
- 'basic_cancel' |
- 'refresh').
-
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
@@ -24,7 +17,7 @@
-callback active_for(rabbit_types:amqqueue()) -> boolean().
--callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+-callback active_consumers_changed(rabbit_types:amqqueue(), any()) -> 'ok'.
-else.
@@ -32,7 +25,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
- {active_for, 1}, {notify, 3}];
+ {active_for, 1}, {active_consumers_changed, 2}];
behaviour_info(_Other) ->
undefined.