summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 12:58:25 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 12:58:25 +0000
commitab21c6474f680a1a90cf0906fbfcbc7a932b369b (patch)
treebe1da9be3d373fd78cd9a5fa35ef29f3e5a0dbdf
parent53434e868baaad58076cd4d2b899a3a9f262d754 (diff)
downloadrabbitmq-server-git-ab21c6474f680a1a90cf0906fbfcbc7a932b369b.tar.gz
Simplfy (although perhaps not as much as we had hoped) by only informing queue decorators when the active consumers may have changed, rather than trying to give them more information about what is happening.
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_queue_consumers.erl31
-rw-r--r--src/rabbit_queue_decorator.erl11
3 files changed, 38 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 281aecb93c..7597ec9d84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
- notify_decorators(startup, [], State3),
+ notify_decorators(startup, State3),
State3.
init_state(Q) ->
@@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q,
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- notify_decorators(startup, [], State),
+ notify_decorators(startup, State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -213,18 +213,16 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, Props, State) when Event =:= startup;
- Event =:= shutdown ->
- decorator_callback(qname(State), Event, Props);
+notify_decorators(Event, State) ->
+ decorator_callback(qname(State), Event, []).
-notify_decorators(Event, Props, State = #q{consumers = Consumers,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+notify_decorators(State = #q{consumers = Consumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
P = rabbit_queue_consumers:max_active_priority(Consumers),
- decorator_callback(qname(State), notify,
- [Event, [{max_active_consumer_priority, P},
- {is_empty, BQ:is_empty(BQS)} |
- Props]]).
+ decorator_callback(qname(State), active_consumers_changed,
+ [[{max_active_consumer_priority, P},
+ {is_empty, BQ:is_empty(BQS)}]]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -308,7 +306,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- notify_decorators(shutdown, [], State),
+ notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
{Ch, CTag, _, _} <-
rabbit_queue_consumers:all(Consumers)],
@@ -401,7 +399,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> notify_decorators(queue_empty, [], State),
+ true -> notify_decorators(State),
rabbit_queue_consumers:send_drained();
false -> ok
end,
@@ -412,8 +410,10 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
Consumers),
State2 = State1#q{consumers = Consumers1},
- [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
- {_ChPid, CTag} <- Blocked],
+ case Blocked of
+ true -> notify_decorators(State2);
+ false -> ok
+ end,
{Active, State2}.
confirm_messages([], State) ->
@@ -568,10 +568,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
unchanged ->
State;
- {unblocked, UnblockedCTags, Consumers1} ->
+ {unblocked, Consumers1} ->
State1 = State#q{consumers = Consumers1},
- [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
- State1) || CTag <- UnblockedCTags],
+ notify_decorators(State1),
run_message_queue(State1)
end.
@@ -599,8 +598,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- ChCTags],
+ notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
@@ -1034,8 +1032,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1), OtherArgs),
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1054,8 +1051,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
- notify_decorators(
- basic_cancel, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1218,7 +1214,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
ChPid, State));
handle_cast(notify_decorators, State) ->
- notify_decorators(refresh, [], State),
+ notify_decorators(State),
noreply(State);
handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 702091dca9..19b68cac66 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -82,8 +82,7 @@
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' |
- {'unblocked', [rabbit_types:ctag()], state()}.
+ 'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -182,42 +181,41 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
deliver(FetchFun, Stop, QName, S, State) ->
- deliver(FetchFun, Stop, QName, [], S, State).
+ deliver(FetchFun, Stop, QName, false, S, State).
-deliver(_FetchFun, true, _QName, Blocked, S, State) ->
- {true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S,
+deliver(_FetchFun, true, _QName, NewlyBlocked, S, State) ->
+ {true, NewlyBlocked, S, State};
+deliver( FetchFun, false, QName, NewlyBlocked, S,
State = #state{consumers = Consumers, use = Use}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
+ Use1 = update_use(Use, inactive),
+ {false, NewlyBlocked, S, State#state{use = Use1}};
{{value, QEntry, Priority}, Tail} ->
- {Stop, Blocked1, S1, Consumers1} =
+ {Stop, NewlyBlocked1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
- Blocked, S, Tail),
- deliver(FetchFun, Stop, QName, Blocked1, S1,
+ NewlyBlocked, S, Tail),
+ deliver(FetchFun, Stop, QName, NewlyBlocked1, S1,
State#state{consumers = Consumers1})
end.
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
- Blocked, S, Consumers) ->
+ NewlyBlocked, S, Consumers) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ {false, true, S, Consumers};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ {false, true, S, Consumers};
{continue, Limiter} ->
{Stop, S1} = deliver_to_consumer(
FetchFun, Consumer,
C#cr{limiter = Limiter}, QName, S),
- {Stop, Blocked, S1,
+ {Stop, NewlyBlocked, S1,
priority_queue:in(E, Priority, Consumers)}
end
end.
@@ -290,7 +288,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
- tags(Unblocked),
State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
use = update_use(Use, active)}}
end.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 8f6375a504..b3c02403be 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,13 +8,6 @@
-ifdef(use_specs).
--type(notify_event() :: 'consumer_blocked' |
- 'consumer_unblocked' |
- 'queue_empty' |
- 'basic_consume' |
- 'basic_cancel' |
- 'refresh').
-
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
@@ -24,7 +17,7 @@
-callback active_for(rabbit_types:amqqueue()) -> boolean().
--callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+-callback active_consumers_changed(rabbit_types:amqqueue(), any()) -> 'ok'.
-else.
@@ -32,7 +25,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
- {active_for, 1}, {notify, 3}];
+ {active_for, 1}, {active_consumers_changed, 2}];
behaviour_info(_Other) ->
undefined.