diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 47 |
1 files changed, 39 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fa7491104b..63cb12e395 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -215,7 +215,8 @@ recover() -> %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), - DurableQueues = find_durable_queues(), + DurableQueues = queues_to_recover(), + L = length(DurableQueues), %% if there are not enough file handles, the server might hang @@ -257,6 +258,31 @@ start(Qs) -> [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], ok. +queues_to_recover() -> + DurableQueues = find_durable_queues(), + VHosts = rabbit_vhost:list(), + + {QueuesWithVhost, QueuesWithoutVhost} = lists:partition( + fun(#amqqueue{name = #resource{virtual_host = VHost}}) -> + lists:member(VHost, VHosts) + end, + DurableQueues), + + {LocalQueuesWithoutVhost, _RemoteQueuesWithoutVhost} = lists:partition( + fun(#amqqueue{pid = QPid}) -> node(QPid) == node() end, + QueuesWithoutVhost), + + {atomic, ok} = + mnesia:sync_transaction( + fun () -> + rabbit_log:error("Deleting ~p~n", [LocalQueuesWithoutVhost]), + [ internal_delete1(Name, false) + || #amqqueue{name = Name} <- LocalQueuesWithoutVhost ], + ok + end), + + QueuesWithVhost. + find_durable_queues() -> Node = node(), mnesia:async_dirty( @@ -576,7 +602,8 @@ list_local_names() -> State =/= crashed, node() =:= node(QPid) ]. -list(VHostPath) -> list(VHostPath, rabbit_queue). +list(VHostPath) -> + list(VHostPath, rabbit_queue). %% Not dirty_match_object since that would not be transactional when used in a %% tx context @@ -590,12 +617,16 @@ list(VHostPath, TableName) -> end). list_down(VHostPath) -> - Present = list(VHostPath), - Durable = list(VHostPath, rabbit_durable_queue), - PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), - sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> - not sets:is_element(N, PresentS) - end, sets:from_list(Durable))). + case rabbit_vhost:exists(VHostPath) of + false -> []; + true -> + Present = list(VHostPath), + Durable = list(VHostPath, rabbit_durable_queue), + PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), + sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> + not sets:is_element(N, PresentS) + end, sets:from_list(Durable))) + end. info_keys() -> rabbit_amqqueue_process:info_keys(). |
