diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-12 15:26:37 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-12 15:26:37 +0100 |
| commit | f5bb52f0b28e7c628290e7596ea866c0dcc2f8aa (patch) | |
| tree | 991b781753ec79d5c0c8fcce49d851c21a73aa0c /src | |
| parent | 42092de9b1f3fa73b5b439bf6cdb2c5cece60331 (diff) | |
| parent | 8ae5ab9ad31f9d07be04c1c08f7bfeb7f0d2ec84 (diff) | |
| download | rabbitmq-server-git-f5bb52f0b28e7c628290e7596ea866c0dcc2f8aa.tar.gz | |
Merging bug 22695 into amq_0_9_1
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 60 |
2 files changed, 40 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index dc37c835dc..b237be8a48 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -145,49 +145,6 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. - -%% This changed too radically to merge. We'll fix this later; see bug 22695 -%% recover_durable_queues(DurableQueues) -> -%% lists:foldl( -%% fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }, -%% Acc) -> -%% %% We need to catch the case where a client connected to -%% %% another node has deleted the queue (and possibly -%% %% re-created it). -%% DoIfSameQueue = -%% fun (Action) -> -%% rabbit_misc:execute_mnesia_transaction( -%% fun () -> case mnesia:match_object( -%% rabbit_durable_queue, RecoveredQ, read) of -%% [_] -> {true, Action()}; -%% [] -> false -%% end -%% end) -%% end, -%% case shared_or_live_owner(Owner) of -%% true -> -%% Q = start_queue_process(RecoveredQ), -%% case DoIfSameQueue(fun () -> store_queue(Q) end) of -%% {true, ok} -> [Q | Acc]; -%% false -> exit(Q#amqqueue.pid, shutdown), -%% Acc -%% end; -%% false -> -%% case DoIfSameQueue( -%% fun () -> -%% internal_delete2(RecoveredQ#amqqueue.name) -%% end) of -%% {true, Hook} -> Hook(); -%% false -> ok -%% end, -%% Acc -%% end -%% end, [], DurableQueues). - -%% shared_or_live_owner(none) -> -%% true; -%% shared_or_live_owner(Owner) when is_pid(Owner) -> -%% rpc:call(node(Owner), erlang, is_process_alive, [Owner]). declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 92c21fa667..ff1b8f1baa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -517,27 +517,47 @@ handle_call({init, Recover}, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, exclusive_owner = ExclusiveOwner}, backing_queue = BQ, backing_queue_state = undefined}) -> + Declare = + fun() -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> + {stop, normal, not_found, State}; + Q -> + gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + noreply( + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}); + Q1 -> + {stop, normal, Q1, State} + end + end, + case ExclusiveOwner of - none -> ok; - ReaderPid -> erlang:monitor(process, ReaderPid) - end, - %% TODO: If we're exclusively owned && our owner isn't alive && - %% Recover then we should BQ:init and then {stop, normal, - %% not_found, State}, relying on terminate to delete the queue. - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q -> - gen_server2:reply(From, Q), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), - {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}); - Q1 -> - {stop, normal, Q1, State} + none -> + Declare(); + Owner -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> + erlang:monitor(process, Owner), + Declare(); + _ -> + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", + [QName]) + end, + %% Rely on terminate to delete the queue. + {stop, normal, not_found, + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}} + end end; handle_call(info, _From, State) -> |
