diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 138 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_prequeue.erl | 104 |
4 files changed, 179 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 692179fce2..e625572edc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ cancel_sync_mirrors/1]). %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/1, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -76,9 +76,9 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) -> {'new' | 'existing' | 'absent' | 'owner_died', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). --spec(internal_declare/2 :: - (rabbit_types:amqqueue(), boolean()) - -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). +%% -spec(internal_declare/2 :: +%% (rabbit_types:amqqueue(), boolean()) +%% -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) @@ -196,6 +196,8 @@ arguments]). recover() -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_recovery, Marker), %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -212,7 +214,11 @@ recover() -> {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). + Recovered = recover_durable_queues( + lists:zip(DurableQueues, OrderedRecoveryTerms)), + unlink(Marker), + Marker ! stop, + Recovered. stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -271,29 +277,14 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). -internal_declare(Q, true) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); -internal_declare(Q = #amqqueue{name = QueueName}, false) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case not_found_or_absent(QueueName) of - not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), - B = add_default_binding(Q1), - fun () -> B(), Q1 end; - {absent, _Q} = R -> rabbit_misc:const(R) - end; - [ExistingQ = #amqqueue{pid = QPid}] -> - case rabbit_misc:is_process_alive(QPid) of - true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), - fun () -> TailFun(), ExistingQ end - end - end - end). +internal_declare(Q = #amqqueue{name = QueueName}) -> + case not_found_or_absent(QueueName) of + not_found -> ok = store_queue(Q), + B = add_default_binding(Q), + %% TODO can we simplify return here? + {new, fun () -> B(), Q end}; + {absent, _Q} = R -> R + end. update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index db297c1daa..1c982dbb92 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,9 @@ -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster --export([start_link/1, info_keys/0]). +-export([info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_declared/3, init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -61,8 +61,8 @@ -ifdef(use_specs). --spec(start_link/1 :: - (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). +%% -spec(start_link/1 :: +%% (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(init_with_backing_queue_state/7 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), @@ -102,19 +102,64 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(Q) -> +init(_) -> + exit(cannot_be_called_directly). + +%% We have just been declared or recovered +init_declared(Recover, From, Q = #amqqueue{name = QName, + exclusive_owner = Owner}) -> process_flag(trap_exit, true), - ?store_proc_name(Q#amqqueue.name), - {ok, init_state(Q#amqqueue{pid = self()}), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + ?store_proc_name(QName), + State = init_state(Q), + case Owner of + none -> finish_init(Recover, From, State); + _ -> case rabbit_misc:is_process_alive(Owner) of %% [1] + true -> erlang:monitor(process, Owner), + finish_init(Recover, From, State); + false -> gen_server2:reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, + backing_queue_state = BQS}} + end + end. +%% [1] You used to be able to declare an exclusive durable +%% queue. Sadly we need to still tidy up after that case, there could +%% be the remnants of one left over from an upgrade. So that's why we +%% don't enforce Recover = new here. + +finish_init(Recover, From, State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> + {Recovery, TermsOrNew} = recovery_status(Recover), + gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Recovery), + State1 = process_args_policy(State#q{backing_queue = BQ, + backing_queue_state = BQS}), + notify_decorators(startup, State1), + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + {become, ?MODULE, State1, hibernate}. + +recovery_status(new) -> {new, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. +%% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> case Owner of @@ -174,54 +219,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> - {Recovery, TermsOrNew} = recovery_status(Recover), - case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of - #amqqueue{} = Q1 -> - case matches(Recovery, Q, Q1) of - true -> - gen_server2:reply(From, {new, Q}), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - BQ = backing_queue_module(Q1), - BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(Recovery), - State1 = process_args_policy( - State#q{backing_queue = BQ, - backing_queue_state = BQS}), - notify_decorators(startup, State), - rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - noreply(State1); - false -> - {stop, normal, {existing, Q1}, State} - end; - Err -> - {stop, normal, Err, State} - end. - -recovery_status(new) -> {new, new}; -recovery_status({Recover, Terms}) -> {Recover, Terms}. - -matches(new, Q1, Q2) -> - %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso - Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; -matches(_, Q, Q) -> true; -matches(_, _Q, _Q1) -> false. - maybe_notify_decorators(false, State) -> State; maybe_notify_decorators(true, State) -> notify_decorators(State), State. @@ -915,31 +912,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = none}}) -> - declare(Recover, From, State); - -%% You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants -%% of one left over from an upgrade. So that's why we don't enforce -%% Recover = new here. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rabbit_misc:is_process_alive(Owner) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - false -> #q{backing_queue = undefined, - backing_queue_state = undefined, - q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, backing_queue_state = BQS}} - end; - handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0fd64c2671..137422d490 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -48,5 +48,5 @@ start_child(Node, Args) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, + [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl new file mode 100644 index 0000000000..148f5968fa --- /dev/null +++ b/src/rabbit_prequeue.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_prequeue). + +%% This is the initial gen_server that all queue processes start off +%% as. It handles the decision as to whether we need to start a new +%% slave, a new master/unmirrored, whether we lost a race to declare a +%% new queue, or whether we are in recovery. Thus a crashing queue +%% process can restart from here and always do the right thing. + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-behaviour(gen_server2). + +-include("rabbit.hrl"). + +start_link(Q) -> + gen_server2:start_link(?MODULE, Q, []). + +%%---------------------------------------------------------------------------- + +init(Q) -> + %% Hand back to supervisor ASAP + gen_server2:cast(self(), init), + {ok, Q#amqqueue{pid = self()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}. + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(init, Q) -> + case whereis(rabbit_recovery) of + undefined -> init_non_recovery(Q); + _Pid -> init_recovery(Q) + end; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +init_non_recovery(Q = #amqqueue{name = QueueName}) -> + Result = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + {decl, rabbit_amqqueue:internal_declare(Q)}; + [ExistingQ = #amqqueue{pid = QPid}] -> + case rabbit_misc:is_process_alive(QPid) of + true -> {decl, {existing, ExistingQ}}; + false -> exit(todo) + end + end + end), + case Result of + {decl, DeclResult} -> + %% We have just been declared. Block waiting for an init + %% call so that we don't respond to any other message first + receive {'$gen_call', From, {init, new}} -> + case DeclResult of + {new, Fun} -> + Q1 = Fun(), + rabbit_amqqueue_process:init_declared(new,From, Q1); + {F, _} when F =:= absent; F =:= existing -> + gen_server2:reply(From, DeclResult), + {stop, normal, Q} + end + end + end. + +init_recovery(Q) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> ok = rabbit_amqqueue:store_queue(Q) end), + %% Again block waiting for an init call. + receive {'$gen_call', From, {init, Terms}} -> + rabbit_amqqueue_process:init_declared(Terms, From, Q) + end. |
