summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-26 18:19:16 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-26 18:19:16 +0100
commit61474916926f6ea04457aa43b7e09ae6efb2b651 (patch)
tree0805ecd9f95e734276a33010a986509ef55706bd /src
parent7c667f7005d66983188ae21c4e498ba219830f93 (diff)
downloadrabbitmq-server-git-61474916926f6ea04457aa43b7e09ae6efb2b651.tar.gz
simplify queue startup
by going through the same three-step sequence (start process, init_backing_queue, sync) in both recovery and creation
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl25
2 files changed, 15 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4e12bb7d47..668f4ae256 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -144,7 +144,7 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- Qs = [start_queue_process(Q, false) || Q <- DurableQueues],
+ Qs = [start_queue_process(Q) || Q <- DurableQueues],
%% Issue inits to *all* the queues so that they all init at the same time
[ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs],
[ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
@@ -157,7 +157,9 @@ declare(QueueName, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
- pid = none}, true),
+ pid = none}),
+ ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue),
+ ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
@@ -194,8 +196,8 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-start_queue_process(Q, InitBackingQueue) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child([Q, InitBackingQueue]),
+start_queue_process(Q) ->
+ {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b10baacb46..10e1193f02 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,7 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--export([start_link/2, info_keys/0]).
+-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -94,14 +94,14 @@
%%----------------------------------------------------------------------------
-start_link(Q, InitBackingQueue) ->
- gen_server2:start_link(?MODULE, [Q, InitBackingQueue], []).
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, [Q], []).
info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
-init([Q, InitBQ]) ->
+init([Q]) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(
@@ -115,7 +115,7 @@ init([Q, InitBQ]) ->
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
- backing_queue_state = maybe_init_backing_queue(InitBQ, BQ, Q),
+ backing_queue_state = undefined,
backing_queue_timeout_fun = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -123,12 +123,6 @@ init([Q, InitBQ]) ->
rate_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-maybe_init_backing_queue(
- true, BQ, #amqqueue{name = QName, durable = IsDurable}) ->
- BQ:init(QName, IsDurable);
-maybe_init_backing_queue(false, _BQ, _Q) ->
- undefined.
-
terminate(shutdown, State) ->
terminate_shutdown(terminate, State);
terminate({shutdown, _}, State) ->
@@ -731,11 +725,10 @@ handle_call({claim_queue, ReaderPid}, _From,
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-
-handle_cast(init_backing_queue, State = #q{backing_queue_state = undefined,
- backing_queue = BQ, q = Q}) ->
- noreply(State#q{backing_queue_state =
- maybe_init_backing_queue(true, BQ, Q)});
+handle_cast(init_backing_queue,
+ State = #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue_state = undefined, backing_queue = BQ}) ->
+ noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)});
handle_cast(init_backing_queue, State) ->
noreply(State);