diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 1 |
4 files changed, 30 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ff57593374..5537634144 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,6 +40,7 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). -export([pid_of/1, pid_of/2]). +-export([mark_local_durable_queues_stopped/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -255,6 +256,15 @@ start(Qs) -> [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], ok. +mark_local_durable_queues_stopped(VHost) -> + Qs = find_durable_queues(VHost), + rabbit_misc:execute_mnesia_transaction( + fun() -> + [ store_queue(Q#amqqueue{ state = stopped }) + || Q = #amqqueue{ state = State } <- Qs, + State =/= stopped ] + end). + find_durable_queues(VHost) -> Node = node(), mnesia:async_dirty( @@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) -> E({absent, Q, timeout}); {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + {ok, Q = #amqqueue{state = stopped}} -> + %% The queue process was stopped by the supervisor + E({absent, Q, stopped}); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); +info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); +info(Q = #amqqueue{ state = stopped }, Items) -> + info_down(Q, Items, stopped); info(#amqqueue{ pid = QPid }, Items) -> case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4e43104de2..678f1136c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) -> rabbit_core_metrics:queue_deleted(qname(State)), - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); + terminate_shutdown( + fun (BQS) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + [Q] = mnesia:read({rabbit_queue, QName}), + Q2 = Q#amqqueue{state = stopped}, + rabbit_amqqueue:store_queue(Q2) + end), + BQ:terminate(R, BQS) + end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 00a6607dfb..c69a27d57c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin, fun (not_found) -> {ok, 0}; ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), {ok, 0}; + ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username), + {ok, 0}; ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index e3c815a727..f6e4a83daa 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -55,6 +55,7 @@ init([VHost]) -> timer:send_interval(Interval, check_vhost), {ok, VHost} catch _:Reason -> + rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" " Stacktrace ~p", [VHost, Reason, erlang:get_stacktrace()]), |
