diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-13 13:07:28 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-13 13:07:28 +0100 |
| commit | b9f4fbbdb6f1ad21d523f34ce6855f8951eb007c (patch) | |
| tree | 45b68a65a32b78142b8cc3a1b83ada27a6b2abf2 | |
| parent | 9772b7ebd0723521eaa3f4fb442d5e77375efea3 (diff) | |
| parent | 41c0dbfb3c6504f0b4d7e7da80bc21499f9de46d (diff) | |
| download | rabbitmq-server-git-b9f4fbbdb6f1ad21d523f34ce6855f8951eb007c.tar.gz | |
Merge default into bug 21673
| -rw-r--r-- | src/rabbit_amqqueue.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 |
3 files changed, 58 insertions, 51 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 41799a9210..7b88c45d26 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). +-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). @@ -146,12 +146,7 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - %% Issue inits to *all* the queues so that they all init at the same time - [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs], - [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], - rabbit_misc:execute_mnesia_transaction( - fun () -> [ok = store_queue(Q) || Q <- Qs] end), - Qs. + [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -159,36 +154,34 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), - ok = gen_server2:cast(Q#amqqueue.pid, {init, false}), - ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), - internal_declare(Q, true). - -internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case mnesia:read( - {rabbit_durable_queue, QueueName}) of - [] -> ok = store_queue(Q), - case WantDefaultBinding of - true -> add_default_binding(Q); - false -> ok - end, - Q; - [_] -> not_found %% existing Q on stopped node - end; - [ExistingQ] -> - ExistingQ - end - end) of - not_found -> exit(Q#amqqueue.pid, shutdown), - rabbit_misc:not_found(QueueName); - Q -> Q; - ExistingQ -> exit(Q#amqqueue.pid, shutdown), - ExistingQ + case gen_server2:call(Q#amqqueue.pid, {init, false}) of + not_found -> rabbit_misc:not_found(QueueName); + Q1 -> Q1 end. +internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case Recover of + true -> + ok = store_queue(Q), + Q; + false -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case mnesia:read({rabbit_durable_queue, + QueueName}) of + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), + Q; + [_] -> not_found %% Q exists on stopped node + end; + [ExistingQ] -> + ExistingQ + end + end + end). + store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q, write), ok = mnesia:write(rabbit_queue, Q, write), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 06712e9c3d..f12e1b70f8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -103,7 +103,7 @@ init(Q) -> process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), - {ok, #q{q = Q, + {ok, #q{q = Q#amqqueue{pid = self()}, owner = none, exclusive_consumer = none, has_had_consumers = false, @@ -121,9 +121,13 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end, - State), - ok = rabbit_amqqueue:internal_delete(qname(State1)). + terminate_shutdown(fun (BQS) -> + BQS1 = BQ:delete_and_terminate(BQS), + %% don't care if the internal delete + %% doesn't return 'ok'. + rabbit_amqqueue:internal_delete(qname(State)), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -515,8 +519,27 @@ i(Item, _) -> %--------------------------------------------------------------------------- -handle_call(sync, _From, State) -> - reply(ok, State); +handle_call({init, Recover}, From, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined}) -> + %% TODO: If we're exclusively owned && our owner isn't alive && + %% Recover then we should BQ:init and then {stop, normal, + %% not_found, State}, relying on terminate to delete the queue. + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> + {stop, normal, not_found, State}; + Q -> + gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), + {rabbit_amqqueue, set_ram_duration_target, [self()]}), + noreply(State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}); + Q1 -> + {stop, normal, Q1, State} + end; handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -716,15 +739,6 @@ handle_call({claim_queue, ReaderPid}, _From, handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({init, Recover}, - State = #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined}) -> - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)}); - handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1f16ec080e..a48db9c8b3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -138,7 +138,6 @@ info_all(Items) -> init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, @@ -353,6 +352,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {reply, #'channel.open_ok'{}, State#ch{state = running}}; handle_method(#'channel.open'{}, _, _State) -> |
