diff options
| author | Simon MacMullen <simon@lshift.net> | 2010-05-20 15:23:06 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@lshift.net> | 2010-05-20 15:23:06 +0100 |
| commit | 0f03cdddb9e954ff30cef612dc14c140f264d57d (patch) | |
| tree | 9ee7a94eff9413b45ad7ff6e76ef586238c25b72 | |
| parent | 07d41db76a8817be05bca6d01e72d49b3774ca93 (diff) | |
| download | rabbitmq-server-git-0f03cdddb9e954ff30cef612dc14c140f264d57d.tar.gz | |
Cherry-pick a8d7d2ed4471 and e7ccda77c177 from bug22695.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 60 |
2 files changed, 40 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 29d8a00c85..7c2d558188 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -142,8 +142,6 @@ find_durable_queues() -> node(Pid) == Node])) end). -%% TODO this has not been merged from 268c69708cb655beae46b8f025602c1ecd205488 -%% but will be fixed when we merge bug22695 recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3e63451fc6..d731aa2208 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -517,27 +517,47 @@ handle_call({init, Recover}, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, exclusive_owner = ExclusiveOwner}, backing_queue = BQ, backing_queue_state = undefined}) -> + Declare = + fun() -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> + {stop, normal, not_found, State}; + Q -> + gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + noreply( + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}); + Q1 -> + {stop, normal, Q1, State} + end + end, + case ExclusiveOwner of - none -> ok; - ReaderPid -> erlang:monitor(process, ReaderPid) - end, - %% TODO: If we're exclusively owned && our owner isn't alive && - %% Recover then we should BQ:init and then {stop, normal, - %% not_found, State}, relying on terminate to delete the queue. - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q -> - gen_server2:reply(From, Q), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), - {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}); - Q1 -> - {stop, normal, Q1, State} + none -> + Declare(); + Owner -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> + erlang:monitor(process, Owner), + Declare(); + _ -> + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", + [QName]) + end, + %% Rely on terminate to delete the queue. + {stop, normal, not_found, + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}} + end end; handle_call(info, _From, State) -> |
