summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_amqqueue_process.erl60
2 files changed, 40 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index dc37c835dc..b237be8a48 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -145,49 +145,6 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
-
-%% This changed too radically to merge. We'll fix this later; see bug 22695
-%% recover_durable_queues(DurableQueues) ->
-%% lists:foldl(
-%% fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner },
-%% Acc) ->
-%% %% We need to catch the case where a client connected to
-%% %% another node has deleted the queue (and possibly
-%% %% re-created it).
-%% DoIfSameQueue =
-%% fun (Action) ->
-%% rabbit_misc:execute_mnesia_transaction(
-%% fun () -> case mnesia:match_object(
-%% rabbit_durable_queue, RecoveredQ, read) of
-%% [_] -> {true, Action()};
-%% [] -> false
-%% end
-%% end)
-%% end,
-%% case shared_or_live_owner(Owner) of
-%% true ->
-%% Q = start_queue_process(RecoveredQ),
-%% case DoIfSameQueue(fun () -> store_queue(Q) end) of
-%% {true, ok} -> [Q | Acc];
-%% false -> exit(Q#amqqueue.pid, shutdown),
-%% Acc
-%% end;
-%% false ->
-%% case DoIfSameQueue(
-%% fun () ->
-%% internal_delete2(RecoveredQ#amqqueue.name)
-%% end) of
-%% {true, Hook} -> Hook();
-%% false -> ok
-%% end,
-%% Acc
-%% end
-%% end, [], DurableQueues).
-
-%% shared_or_live_owner(none) ->
-%% true;
-%% shared_or_live_owner(Owner) when is_pid(Owner) ->
-%% rpc:call(node(Owner), erlang, is_process_alive, [Owner]).
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 92c21fa667..ff1b8f1baa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -517,27 +517,47 @@ handle_call({init, Recover}, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable,
exclusive_owner = ExclusiveOwner},
backing_queue = BQ, backing_queue_state = undefined}) ->
+ Declare =
+ fun() ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found ->
+ {stop, normal, not_found, State};
+ Q ->
+ gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ noreply(
+ State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)});
+ Q1 ->
+ {stop, normal, Q1, State}
+ end
+ end,
+
case ExclusiveOwner of
- none -> ok;
- ReaderPid -> erlang:monitor(process, ReaderPid)
- end,
- %% TODO: If we're exclusively owned && our owner isn't alive &&
- %% Recover then we should BQ:init and then {stop, normal,
- %% not_found, State}, relying on terminate to delete the queue.
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(),
- {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
+ none ->
+ Declare();
+ Owner ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true ->
+ erlang:monitor(process, Owner),
+ Declare();
+ _ ->
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n",
+ [QName])
+ end,
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found,
+ State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)}}
+ end
end;
handle_call(info, _From, State) ->