summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-05 17:26:02 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-05 17:26:02 +0100
commitff94ad3c3b91eda8e42e7aea4f6094d75a2bb48b (patch)
treeda22b4ba43027d3cc1198a31ce6e19eedf248e8c /src
parentb6085dd103e49734ec1a2cc08b15cac125939221 (diff)
downloadrabbitmq-server-git-ff94ad3c3b91eda8e42e7aea4f6094d75a2bb48b.tar.gz
Extend queue decorator interface, break dependency of server on federation. This is still a bit ugly, bits of queue state leak out in the events at the moment, and events are still exactly those needed by federation rather than a carefully thought out set. But it works.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_queue_decorator.erl9
3 files changed, 41 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5331a58462..c5789f8a58 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4, notify_federation/1]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -156,7 +156,7 @@
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
--spec(notify_federation/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
@@ -569,8 +569,8 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
-notify_federation(#amqqueue{pid = QPid}) ->
- delegate:cast(QPid, notify_federation).
+notify_decorators(#amqqueue{pid = QPid}) ->
+ delegate:cast(QPid, notify_decorators).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 893659d4e7..6624b5d264 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -198,7 +198,7 @@ declare(Recover, From, State = #q{q = Q,
recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- rabbit_federation_queue:maybe_start(Q),
+ callback(qname(State), startup, []),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -223,6 +223,22 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
+callback(QName, F, A) ->
+ %% Look up again in case policy and hence decorators have changed
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q = #amqqueue{decorators = Ds}} ->
+ [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
+ {error, not_found} ->
+ ok
+ end.
+
+notify_decorators(Event, Props, State = #q{active_consumers = ACs,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ callback(qname(State), notify,
+ [Event, [{active_consumers, ACs},
+ {is_empty, BQ:is_empty(BQS)} | Props]]).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover =/= new,
@@ -265,7 +281,7 @@ init_dlx_routing_key(RoutingKey, State) ->
init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
-terminate_shutdown(Fun, State = #q{q = Q}) ->
+terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
@@ -276,7 +292,7 @@ terminate_shutdown(Fun, State = #q{q = Q}) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- rabbit_federation_queue:maybe_stop(Q),
+ callback(QName, shutdown, []),
[emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
@@ -407,9 +423,10 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
-block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry, State) ->
Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked),
- update_ch_record(C#cr{blocked_consumers = Blocked1}).
+ update_ch_record(C#cr{blocked_consumers = Blocked1}),
+ notify_decorators(consumer_blocked, [{q_entry, QEntry}], State).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
@@ -446,15 +463,13 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
- true -> block_consumer(C, E),
- notify_federation(State),
+ true -> block_consumer(C, E, State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E),
- notify_federation(State),
+ block_consumer(C#cr{limiter = Limiter}, E, State),
{false, State};
{continue, Limiter} ->
AC1 = priority_queue:in(E, consumer_priority(E),
@@ -537,22 +552,9 @@ run_message_queue(State) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
is_empty(State), State),
- notify_federation(State1),
+ notify_decorators(queue_finished, [], State1),
State1.
-notify_federation(#q{q = Q,
- active_consumers = ActiveConsumers,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- IsEmpty = BQ:is_empty(BQS),
- case IsEmpty andalso active_unfederated(ActiveConsumers) of
- true -> rabbit_federation_queue:run(Q);
- false -> rabbit_federation_queue:pause(Q)
- end.
-
-active_unfederated(Cs) ->
- not priority_queue:is_empty(Cs) andalso priority_queue:highest(Cs) >= 0.
-
consumer_priority({_ChPid, #consumer{args = Args}}) ->
case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
{_, Priority} -> Priority;
@@ -1227,7 +1229,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
- notify_federation(State1),
+ notify_decorators(
+ basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1404,8 +1407,8 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
end
end);
-handle_cast(notify_federation, State) ->
- notify_federation(State),
+handle_cast(notify_decorators, State) ->
+ notify_decorators(on_demand, [], State),
noreply(State);
handle_cast(wake_up, State) ->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 9ea13beb0d..d3dec9dc91 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,17 +8,24 @@
-ifdef(use_specs).
+-callback startup(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
+
-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
'ok'.
-callback active_for(rabbit_types:amqqueue()) -> boolean().
+-callback notify(rabbit_types:amqqueue(), atom(), any()) -> 'ok'.
+
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {active_for, 1}, {policy_changed, 2}];
+ [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
+ {active_for, 1}, {notify, 3}];
behaviour_info(_Other) ->
undefined.