diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-11 13:50:28 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-11 13:50:28 +0100 |
| commit | 1180168d0267e16d0beb919907b96d558e0c1f37 (patch) | |
| tree | b638ed4bf5f9747ee4d14088ecd75a82a0adaa78 | |
| parent | 56f63e73b33fbec616e50c47ea8270d4dea5a656 (diff) | |
| download | rabbitmq-server-git-1180168d0267e16d0beb919907b96d558e0c1f37.tar.gz | |
Add crashed queues to the output of "rabbitmqctl list_queues". This is rather fiddly since we need to distinguish between queues which are crashed and those which are deleted in between us checking Mnesia and calling the queue pid. We do this by observing that the deleting queue removes its own Mnesia record, so we can check again and if the record is still there after the pid is down, it must have crashed. But this is still rather fiddly; and I wonder if reifying the crashedness in Mnesia would be cleaner.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 64 |
1 files changed, 47 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a4fa0cd3c4..34f231f881 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -557,31 +557,61 @@ info(#amqqueue{ pid = QPid }, Items) -> {error, Error} -> throw(Error) end. -info_down(Q) -> - info_down(Q, rabbit_amqqueue_process:info_keys()). - -info_down(Q, Items) -> [{Item, i_down(Item, Q)} || Item <- Items]. - -i_down(name, #amqqueue{name = Name}) -> Name; -i_down(durable, #amqqueue{durable = Durable}) -> Durable; -i_down(auto_delete, #amqqueue{auto_delete = AD}) -> AD; -i_down(arguments, #amqqueue{arguments = Args}) -> Args; -i_down(pid, #amqqueue{pid = QPid}) -> QPid; -i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}) -> DSN; -i_down(state, _Q) -> down; -i_down(K, _Q) -> +info_down(Q, DownReason) -> + info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason). + +info_down(Q, Items, DownReason) -> + [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. + +i_down(name, #amqqueue{name = Name}, _) -> Name; +i_down(durable, #amqqueue{durable = Durable},_) -> Durable; +i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; +i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; +i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; +i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN; +i_down(state, _Q, DownReason) -> DownReason; +i_down(K, _Q, _DownReason) -> case lists:member(K, rabbit_amqqueue_process:info_keys()) of true -> ''; false -> throw({bad_argument, K}) end. info_all(VHostPath) -> - map(list(VHostPath), fun (Q) -> info(Q) end) ++ - map(list_down(VHostPath), fun (Q) -> info_down(Q) end). + map_with_crashed(fun (Q) -> info(Q) end, + fun (Q) -> info_down(Q, crashed) end, + list(VHostPath)) ++ + [info_down(Q, down) || Q <- list_down(VHostPath)]. info_all(VHostPath, Items) -> - map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ - map(list_down(VHostPath), fun (Q) -> info_down(Q, Items) end). + map_with_crashed(fun (Q) -> info(Q, Items) end, + fun (Q) -> info_down(Q, Items, crashed) end, + list(VHostPath)) ++ + [info_down(Q, Items, down) || Q <- list_down(VHostPath)]. + +map_with_crashed(F, FCrashed, Qs) -> + {Bad, Good} = lists:partition( + fun ({failed, _, _}) -> true; + (_) -> false + end, + [rabbit_misc:with_exit_handler( + fun () -> {failed, QPid, Name} end, + fun () -> F(Q) end) || + Q = #amqqueue{pid = QPid, name = Name} <- Qs]), + Qs2 = lookup([N || {failed, _, N} <- Bad]), + Good ++ [FCrashed(Q) || Q <- filter_crashed(Bad, Qs2, [])]. + +filter_crashed([], [], Acc) -> Acc; +%% The queue is still there with same pid -> crashed +filter_crashed([{failed, QPid, Name} | Rest], + [Q = #amqqueue{pid = QPid, name = Name} | Qs], Acc) -> + filter_crashed(Rest, Qs, [Q | Acc]); +%% The queue is still there with different pid -> recovered +filter_crashed([{failed, _QPid, Name} | Rest], + [#amqqueue{name = Name} | Qs], Acc) -> + filter_crashed(Rest, Qs, Acc); +%% The queue is not there -> deleted +filter_crashed([{failed, _QPid, _Name} | Rest], Qs, Acc) -> + filter_crashed(Rest, Qs, Acc). force_event_refresh(Ref) -> [gen_server2:cast(Q#amqqueue.pid, |
