summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-20 15:23:06 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-20 15:23:06 +0100
commit0f03cdddb9e954ff30cef612dc14c140f264d57d (patch)
tree9ee7a94eff9413b45ad7ff6e76ef586238c25b72
parent07d41db76a8817be05bca6d01e72d49b3774ca93 (diff)
downloadrabbitmq-server-git-0f03cdddb9e954ff30cef612dc14c140f264d57d.tar.gz
Cherry-pick a8d7d2ed4471 and e7ccda77c177 from bug22695.
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl60
2 files changed, 40 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 29d8a00c85..7c2d558188 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -142,8 +142,6 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-%% TODO this has not been merged from 268c69708cb655beae46b8f025602c1ecd205488
-%% but will be fixed when we merge bug22695
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3e63451fc6..d731aa2208 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -517,27 +517,47 @@ handle_call({init, Recover}, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable,
exclusive_owner = ExclusiveOwner},
backing_queue = BQ, backing_queue_state = undefined}) ->
+ Declare =
+ fun() ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found ->
+ {stop, normal, not_found, State};
+ Q ->
+ gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ noreply(
+ State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)});
+ Q1 ->
+ {stop, normal, Q1, State}
+ end
+ end,
+
case ExclusiveOwner of
- none -> ok;
- ReaderPid -> erlang:monitor(process, ReaderPid)
- end,
- %% TODO: If we're exclusively owned && our owner isn't alive &&
- %% Recover then we should BQ:init and then {stop, normal,
- %% not_found, State}, relying on terminate to delete the queue.
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(),
- {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
+ none ->
+ Declare();
+ Owner ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true ->
+ erlang:monitor(process, Owner),
+ Declare();
+ _ ->
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n",
+ [QName])
+ end,
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found,
+ State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)}}
+ end
end;
handle_call(info, _From, State) ->