summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-13 13:07:28 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-13 13:07:28 +0100
commitb9f4fbbdb6f1ad21d523f34ce6855f8951eb007c (patch)
tree45b68a65a32b78142b8cc3a1b83ada27a6b2abf2 /src
parent9772b7ebd0723521eaa3f4fb442d5e77375efea3 (diff)
parent41c0dbfb3c6504f0b4d7e7da80bc21499f9de46d (diff)
downloadrabbitmq-server-git-b9f4fbbdb6f1ad21d523f34ce6855f8951eb007c.tar.gz
Merge default into bug 21673
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl63
-rw-r--r--src/rabbit_amqqueue_process.erl44
-rw-r--r--src/rabbit_channel.erl2
3 files changed, 58 insertions, 51 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 41799a9210..7b88c45d26 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -109,7 +109,7 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
+-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
@@ -146,12 +146,7 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- %% Issue inits to *all* the queues so that they all init at the same time
- [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
- [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
- rabbit_misc:execute_mnesia_transaction(
- fun () -> [ok = store_queue(Q) || Q <- Qs] end),
- Qs.
+ [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -159,36 +154,34 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
- ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
- ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
- internal_declare(Q, true).
-
-internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- case mnesia:read(
- {rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- case WantDefaultBinding of
- true -> add_default_binding(Q);
- false -> ok
- end,
- Q;
- [_] -> not_found %% existing Q on stopped node
- end;
- [ExistingQ] ->
- ExistingQ
- end
- end) of
- not_found -> exit(Q#amqqueue.pid, shutdown),
- rabbit_misc:not_found(QueueName);
- Q -> Q;
- ExistingQ -> exit(Q#amqqueue.pid, shutdown),
- ExistingQ
+ case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ not_found -> rabbit_misc:not_found(QueueName);
+ Q1 -> Q1
end.
+internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case Recover of
+ true ->
+ ok = store_queue(Q),
+ Q;
+ false ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case mnesia:read({rabbit_durable_queue,
+ QueueName}) of
+ [] -> ok = store_queue(Q),
+ ok = add_default_binding(Q),
+ Q;
+ [_] -> not_found %% Q exists on stopped node
+ end;
+ [ExistingQ] ->
+ ExistingQ
+ end
+ end
+ end).
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q, write),
ok = mnesia:write(rabbit_queue, Q, write),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 06712e9c3d..f12e1b70f8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -103,7 +103,7 @@ init(Q) ->
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
- {ok, #q{q = Q,
+ {ok, #q{q = Q#amqqueue{pid = self()},
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
@@ -121,9 +121,13 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end,
- State),
- ok = rabbit_amqqueue:internal_delete(qname(State1)).
+ terminate_shutdown(fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(BQS),
+ %% don't care if the internal delete
+ %% doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(qname(State)),
+ BQS1
+ end, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -515,8 +519,27 @@ i(Item, _) ->
%---------------------------------------------------------------------------
-handle_call(sync, _From, State) ->
- reply(ok, State);
+handle_call({init, Recover}, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ %% TODO: If we're exclusively owned && our owner isn't alive &&
+ %% Recover then we should BQ:init and then {stop, normal,
+ %% not_found, State}, relying on terminate to delete the queue.
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found ->
+ {stop, normal, not_found, State};
+ Q ->
+ gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(),
+ {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ noreply(State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)});
+ Q1 ->
+ {stop, normal, Q1, State}
+ end;
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -716,15 +739,6 @@ handle_call({claim_queue, ReaderPid}, _From,
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast({init, Recover},
- State = #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)});
-
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1f16ec080e..a48db9c8b3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -138,7 +138,6 @@ info_all(Items) ->
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
@@ -353,6 +352,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->