summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl47
-rw-r--r--src/rabbit_amqqueue_process.erl138
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_prequeue.erl104
4 files changed, 179 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 692179fce2..e625572edc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
cancel_sync_mirrors/1]).
%% internal
--export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+-export([internal_declare/1, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -76,9 +76,9 @@
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node())
-> {'new' | 'existing' | 'absent' | 'owner_died',
rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
--spec(internal_declare/2 ::
- (rabbit_types:amqqueue(), boolean())
- -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
+%% -spec(internal_declare/2 ::
+%% (rabbit_types:amqqueue(), boolean())
+%% -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue()))
@@ -196,6 +196,8 @@
arguments]).
recover() ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ register(rabbit_recovery, Marker),
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
on_node_down(node()),
@@ -212,7 +214,11 @@ recover() ->
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
+ Recovered = recover_durable_queues(
+ lists:zip(DurableQueues, OrderedRecoveryTerms)),
+ unlink(Marker),
+ Marker ! stop,
+ Recovered.
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -271,29 +277,14 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
-internal_declare(Q, true) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
-internal_declare(Q = #amqqueue{name = QueueName}, false) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- case not_found_or_absent(QueueName) of
- not_found -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
- B = add_default_binding(Q1),
- fun () -> B(), Q1 end;
- {absent, _Q} = R -> rabbit_misc:const(R)
- end;
- [ExistingQ = #amqqueue{pid = QPid}] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName),
- fun () -> TailFun(), ExistingQ end
- end
- end
- end).
+internal_declare(Q = #amqqueue{name = QueueName}) ->
+ case not_found_or_absent(QueueName) of
+ not_found -> ok = store_queue(Q),
+ B = add_default_binding(Q),
+ %% TODO can we simplify return here?
+ {new, fun () -> B(), Q end};
+ {absent, _Q} = R -> R
+ end.
update(Name, Fun) ->
case mnesia:wread({rabbit_queue, Name}) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index db297c1daa..1c982dbb92 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -24,9 +24,9 @@
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
--export([start_link/1, info_keys/0]).
+-export([info_keys/0]).
--export([init_with_backing_queue_state/7]).
+-export([init_declared/3, init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -61,8 +61,8 @@
-ifdef(use_specs).
--spec(start_link/1 ::
- (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
+%% -spec(start_link/1 ::
+%% (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(init_with_backing_queue_state/7 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(),
@@ -102,19 +102,64 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-
info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
-init(Q) ->
+init(_) ->
+ exit(cannot_be_called_directly).
+
+%% We have just been declared or recovered
+init_declared(Recover, From, Q = #amqqueue{name = QName,
+ exclusive_owner = Owner}) ->
process_flag(trap_exit, true),
- ?store_proc_name(Q#amqqueue.name),
- {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ ?store_proc_name(QName),
+ State = init_state(Q),
+ case Owner of
+ none -> finish_init(Recover, From, State);
+ _ -> case rabbit_misc:is_process_alive(Owner) of %% [1]
+ true -> erlang:monitor(process, Owner),
+ finish_init(Recover, From, State);
+ false -> gen_server2:reply(From, {owner_died, Q}),
+ BQ = backing_queue_module(Q),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
+ %% Rely on terminate to delete the queue.
+ {stop, {shutdown, missing_owner},
+ State#q{backing_queue = BQ,
+ backing_queue_state = BQS}}
+ end
+ end.
+%% [1] You used to be able to declare an exclusive durable
+%% queue. Sadly we need to still tidy up after that case, there could
+%% be the remnants of one left over from an upgrade. So that's why we
+%% don't enforce Recover = new here.
+
+finish_init(Recover, From, State = #q{q = Q,
+ backing_queue = undefined,
+ backing_queue_state = undefined}) ->
+ {Recovery, TermsOrNew} = recovery_status(Recover),
+ gen_server2:reply(From, {new, Q}),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ BQ = backing_queue_module(Q),
+ BQS = bq_init(BQ, Q, TermsOrNew),
+ recovery_barrier(Recovery),
+ State1 = process_args_policy(State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
+ notify_decorators(startup, State1),
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #q.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {become, ?MODULE, State1, hibernate}.
+
+recovery_status(new) -> {new, new};
+recovery_status({Recover, Terms}) -> {Recover, Terms}.
+%% We have been promoted
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, Deliveries, Senders, MTC) ->
case Owner of
@@ -174,54 +219,6 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From, State = #q{q = Q,
- backing_queue = undefined,
- backing_queue_state = undefined}) ->
- {Recovery, TermsOrNew} = recovery_status(Recover),
- case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of
- #amqqueue{} = Q1 ->
- case matches(Recovery, Q, Q1) of
- true ->
- gen_server2:reply(From, {new, Q}),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- BQ = backing_queue_module(Q1),
- BQS = bq_init(BQ, Q, TermsOrNew),
- recovery_barrier(Recovery),
- State1 = process_args_policy(
- State#q{backing_queue = BQ,
- backing_queue_state = BQS}),
- notify_decorators(startup, State),
- rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #q.stats_timer,
- fun() -> emit_stats(State1) end),
- noreply(State1);
- false ->
- {stop, normal, {existing, Q1}, State}
- end;
- Err ->
- {stop, normal, Err, State}
- end.
-
-recovery_status(new) -> {new, new};
-recovery_status({Recover, Terms}) -> {Recover, Terms}.
-
-matches(new, Q1, Q2) ->
- %% i.e. not policy
- Q1#amqqueue.name =:= Q2#amqqueue.name andalso
- Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
- Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
- Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
- Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
- Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
-matches(_, Q, Q) -> true;
-matches(_, _Q, _Q1) -> false.
-
maybe_notify_decorators(false, State) -> State;
maybe_notify_decorators(true, State) -> notify_decorators(State), State.
@@ -915,31 +912,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call({init, Recover}, From,
- State = #q{q = #amqqueue{exclusive_owner = none}}) ->
- declare(Recover, From, State);
-
-%% You used to be able to declare an exclusive durable queue. Sadly we
-%% need to still tidy up after that case, there could be the remnants
-%% of one left over from an upgrade. So that's why we don't enforce
-%% Recover = new here.
-handle_call({init, Recover}, From,
- State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rabbit_misc:is_process_alive(Owner) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- false -> #q{backing_queue = undefined,
- backing_queue_state = undefined,
- q = Q} = State,
- gen_server2:reply(From, {owner_died, Q}),
- BQ = backing_queue_module(Q),
- {_, Terms} = recovery_status(Recover),
- BQS = bq_init(BQ, Q, Terms),
- %% Rely on terminate to delete the queue.
- {stop, {shutdown, missing_owner},
- State#q{backing_queue = BQ, backing_queue_state = BQS}}
- end;
-
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 0fd64c2671..137422d490 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -48,5 +48,5 @@ start_child(Node, Args) ->
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
- [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
+ [{rabbit_amqqueue, {rabbit_prequeue, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
new file mode 100644
index 0000000000..148f5968fa
--- /dev/null
+++ b/src/rabbit_prequeue.erl
@@ -0,0 +1,104 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_prequeue).
+
+%% This is the initial gen_server that all queue processes start off
+%% as. It handles the decision as to whether we need to start a new
+%% slave, a new master/unmirrored, whether we lost a race to declare a
+%% new queue, or whether we are in recovery. Thus a crashing queue
+%% process can restart from here and always do the right thing.
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-behaviour(gen_server2).
+
+-include("rabbit.hrl").
+
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, Q, []).
+
+%%----------------------------------------------------------------------------
+
+init(Q) ->
+ %% Hand back to supervisor ASAP
+ gen_server2:cast(self(), init),
+ {ok, Q#amqqueue{pid = self()}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}}.
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(init, Q) ->
+ case whereis(rabbit_recovery) of
+ undefined -> init_non_recovery(Q);
+ _Pid -> init_recovery(Q)
+ end;
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+init_non_recovery(Q = #amqqueue{name = QueueName}) ->
+ Result = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ {decl, rabbit_amqqueue:internal_declare(Q)};
+ [ExistingQ = #amqqueue{pid = QPid}] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> {decl, {existing, ExistingQ}};
+ false -> exit(todo)
+ end
+ end
+ end),
+ case Result of
+ {decl, DeclResult} ->
+ %% We have just been declared. Block waiting for an init
+ %% call so that we don't respond to any other message first
+ receive {'$gen_call', From, {init, new}} ->
+ case DeclResult of
+ {new, Fun} ->
+ Q1 = Fun(),
+ rabbit_amqqueue_process:init_declared(new,From, Q1);
+ {F, _} when F =:= absent; F =:= existing ->
+ gen_server2:reply(From, DeclResult),
+ {stop, normal, Q}
+ end
+ end
+ end.
+
+init_recovery(Q) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok = rabbit_amqqueue:store_queue(Q) end),
+ %% Again block waiting for an init call.
+ receive {'$gen_call', From, {init, Terms}} ->
+ rabbit_amqqueue_process:init_declared(Terms, From, Q)
+ end.