diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-04-26 18:19:16 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-04-26 18:19:16 +0100 |
| commit | 61474916926f6ea04457aa43b7e09ae6efb2b651 (patch) | |
| tree | 0805ecd9f95e734276a33010a986509ef55706bd /src | |
| parent | 7c667f7005d66983188ae21c4e498ba219830f93 (diff) | |
| download | rabbitmq-server-git-61474916926f6ea04457aa43b7e09ae6efb2b651.tar.gz | |
simplify queue startup
by going through the same three-step sequence (start process,
init_backing_queue, sync) in both recovery and creation
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 |
2 files changed, 15 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4e12bb7d47..668f4ae256 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -144,7 +144,7 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(Q, false) || Q <- DurableQueues], + Qs = [start_queue_process(Q) || Q <- DurableQueues], %% Issue inits to *all* the queues so that they all init at the same time [ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs], [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], @@ -157,7 +157,9 @@ declare(QueueName, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - pid = none}, true), + pid = none}), + ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue), + ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), internal_declare(Q, true). internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> @@ -194,8 +196,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -start_queue_process(Q, InitBackingQueue) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child([Q, InitBackingQueue]), +start_queue_process(Q) -> + {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b10baacb46..10e1193f02 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,7 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --export([start_link/2, info_keys/0]). +-export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -94,14 +94,14 @@ %%---------------------------------------------------------------------------- -start_link(Q, InitBackingQueue) -> - gen_server2:start_link(?MODULE, [Q, InitBackingQueue], []). +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- -init([Q, InitBQ]) -> +init([Q]) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), ok = file_handle_cache:register_callback( @@ -115,7 +115,7 @@ init([Q, InitBQ]) -> exclusive_consumer = none, has_had_consumers = false, backing_queue = BQ, - backing_queue_state = maybe_init_backing_queue(InitBQ, BQ, Q), + backing_queue_state = undefined, backing_queue_timeout_fun = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -123,12 +123,6 @@ init([Q, InitBQ]) -> rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -maybe_init_backing_queue( - true, BQ, #amqqueue{name = QName, durable = IsDurable}) -> - BQ:init(QName, IsDurable); -maybe_init_backing_queue(false, _BQ, _Q) -> - undefined. - terminate(shutdown, State) -> terminate_shutdown(terminate, State); terminate({shutdown, _}, State) -> @@ -731,11 +725,10 @@ handle_call({claim_queue, ReaderPid}, _From, handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). - -handle_cast(init_backing_queue, State = #q{backing_queue_state = undefined, - backing_queue = BQ, q = Q}) -> - noreply(State#q{backing_queue_state = - maybe_init_backing_queue(true, BQ, Q)}); +handle_cast(init_backing_queue, + State = #q{q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue_state = undefined, backing_queue = BQ}) -> + noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)}); handle_cast(init_backing_queue, State) -> noreply(State); |
