diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-05 17:26:02 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-05 17:26:02 +0100 |
| commit | ff94ad3c3b91eda8e42e7aea4f6094d75a2bb48b (patch) | |
| tree | da22b4ba43027d3cc1198a31ce6e19eedf248e8c /src | |
| parent | b6085dd103e49734ec1a2cc08b15cac125939221 (diff) | |
| download | rabbitmq-server-git-ff94ad3c3b91eda8e42e7aea4f6094d75a2bb48b.tar.gz | |
Extend queue decorator interface, break dependency of server on federation. This is still a bit ugly, bits of queue state leak out in the events at the moment, and events are still exactly those needed by federation rather than a carefully thought out set. But it works.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_queue_decorator.erl | 9 |
3 files changed, 41 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5331a58462..c5789f8a58 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/10, basic_cancel/4, notify_federation/1]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -156,7 +156,7 @@ -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). --spec(notify_federation/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). @@ -569,8 +569,8 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). -notify_federation(#amqqueue{pid = QPid}) -> - delegate:cast(QPid, notify_federation). +notify_decorators(#amqqueue{pid = QPid}) -> + delegate:cast(QPid, notify_decorators). notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 893659d4e7..6624b5d264 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -198,7 +198,7 @@ declare(Recover, From, State = #q{q = Q, recovery_barrier(Recover), State1 = process_args(State#q{backing_queue = BQ, backing_queue_state = BQS}), - rabbit_federation_queue:maybe_start(Q), + callback(qname(State), startup, []), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -223,6 +223,22 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. +callback(QName, F, A) -> + %% Look up again in case policy and hence decorators have changed + case rabbit_amqqueue:lookup(QName) of + {ok, Q = #amqqueue{decorators = Ds}} -> + [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)]; + {error, not_found} -> + ok + end. + +notify_decorators(Event, Props, State = #q{active_consumers = ACs, + backing_queue = BQ, + backing_queue_state = BQS}) -> + callback(qname(State), notify, + [Event, [{active_consumers, ACs}, + {is_empty, BQ:is_empty(BQS)} | Props]]). + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover =/= new, @@ -265,7 +281,7 @@ init_dlx_routing_key(RoutingKey, State) -> init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. -terminate_shutdown(Fun, State = #q{q = Q}) -> +terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, [fun stop_sync_timer/1, @@ -276,7 +292,7 @@ terminate_shutdown(Fun, State = #q{q = Q}) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - rabbit_federation_queue:maybe_stop(Q), + callback(QName, shutdown, []), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} @@ -407,9 +423,10 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry, State) -> Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked), - update_ch_record(C#cr{blocked_consumers = Blocked1}). + update_ch_record(C#cr{blocked_consumers = Blocked1}), + notify_decorators(consumer_blocked, [{q_entry, QEntry}], State). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). @@ -446,15 +463,13 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of - true -> block_consumer(C, E), - notify_federation(State), + true -> block_consumer(C, E, State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E), - notify_federation(State), + block_consumer(C#cr{limiter = Limiter}, E, State), {false, State}; {continue, Limiter} -> AC1 = priority_queue:in(E, consumer_priority(E), @@ -537,22 +552,9 @@ run_message_queue(State) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, is_empty(State), State), - notify_federation(State1), + notify_decorators(queue_finished, [], State1), State1. -notify_federation(#q{q = Q, - active_consumers = ActiveConsumers, - backing_queue = BQ, - backing_queue_state = BQS}) -> - IsEmpty = BQ:is_empty(BQS), - case IsEmpty andalso active_unfederated(ActiveConsumers) of - true -> rabbit_federation_queue:run(Q); - false -> rabbit_federation_queue:pause(Q) - end. - -active_unfederated(Cs) -> - not priority_queue:is_empty(Cs) andalso priority_queue:highest(Cs) >= 0. - consumer_priority({_ChPid, #consumer{args = Args}}) -> case rabbit_misc:table_lookup(Args, <<"x-priority">>) of {_, Priority} -> Priority; @@ -1227,7 +1229,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, - notify_federation(State1), + notify_decorators( + basic_cancel, [{consumer_tag, ConsumerTag}], State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1404,8 +1407,8 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, end end); -handle_cast(notify_federation, State) -> - notify_federation(State), +handle_cast(notify_decorators, State) -> + notify_decorators(on_demand, [], State), noreply(State); handle_cast(wake_up, State) -> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 9ea13beb0d..d3dec9dc91 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,17 +8,24 @@ -ifdef(use_specs). +-callback startup(rabbit_types:amqqueue()) -> 'ok'. + +-callback shutdown(rabbit_types:amqqueue()) -> 'ok'. + -callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'. -callback active_for(rabbit_types:amqqueue()) -> boolean(). +-callback notify(rabbit_types:amqqueue(), atom(), any()) -> 'ok'. + -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {active_for, 1}, {policy_changed, 2}]; + [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, + {active_for, 1}, {notify, 3}]; behaviour_info(_Other) -> undefined. |
