diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-07-25 17:50:58 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-07-26 17:11:20 +0100 |
| commit | fab1dcdf9ace49e01a1556cd6f3038075fbc3b66 (patch) | |
| tree | 6dd41dfb43f8fe7e6a04008b2f8f55fb43243dd1 /src | |
| parent | 574111eb0c0fe0c01cc0481df6c1f23de6871688 (diff) | |
| download | rabbitmq-server-git-fab1dcdf9ace49e01a1556cd6f3038075fbc3b66.tar.gz | |
Set queue state to 'stopped' when terminating.
If a queue process is stopped by a vhsot supervision tree, it should
be visible in the management UI and the `list_queues` command output.
Setting the queue state to `stopped` is easier to reason about than
checking the vhost aliveness status on a remote node.
If queue will be restarted or migrated to a different node, the status
will be set to `live`.
If a node was not stopped normally the queues will have `live`
state. If we cannot recover the queues thay should be marked
`stopped` to be reported to rabbitmqctl and management UI
Fixes #1303
[#148409695]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 1 |
3 files changed, 31 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ff57593374..605634891e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,6 +40,7 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). -export([pid_of/1, pid_of/2]). +-export([mark_local_durable_queues_stopped/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -255,6 +256,15 @@ start(Qs) -> [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], ok. +mark_local_durable_queues_stopped(VHost) -> + Qs = find_durable_queues(VHost), + rabbit_misc:execute_mnesia_transaction( + fun() -> + [ store_queue(Q#amqqueue{ state = stopped }) + || Q = #amqqueue{ state = State } <- Qs, + State =/= stopped ] + end). + find_durable_queues(VHost) -> Node = node(), mnesia:async_dirty( @@ -452,6 +462,12 @@ with(Name, F, E, RetriesLeft) -> E({absent, Q, timeout}); {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + {ok, Q = #amqqueue{state = stopped}} -> + %% If the queue process was stopped by the supervisor + %% we don't want to retry an operation. + rabbit_misc:with_exit_handler( + fun () -> E({absent, Q, stopped}) + end, fun () -> F(Q) end); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -642,10 +658,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); +info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); +info(Q = #amqqueue{ state = stopped }, Items) -> + info_down(Q, Items, stopped); info(#amqqueue{ pid = QPid }, Items) -> case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4e43104de2..678f1136c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) -> rabbit_core_metrics:queue_deleted(qname(State)), - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); + terminate_shutdown( + fun (BQS) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + [Q] = mnesia:read({rabbit_queue, QName}), + Q2 = Q#amqqueue{state = stopped}, + rabbit_amqqueue:store_queue(Q2) + end), + BQ:terminate(R, BQS) + end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index e3c815a727..f6e4a83daa 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -55,6 +55,7 @@ init([VHost]) -> timer:send_interval(Interval, check_vhost), {ok, VHost} catch _:Reason -> + rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" " Stacktrace ~p", [VHost, Reason, erlang:get_stacktrace()]), |
