diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_prequeue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 8 |
8 files changed, 84 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0ad6af51e4..4906b252b6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,7 +17,9 @@ -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, declare/6, - delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). + delete_immediately/1, delete/3, + delete_crashed/1, delete_crashed_internal/1, + purge/1, forget_all_durable/1]). -export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -49,7 +51,7 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, absent_reason/0]). -type(name() :: rabbit_types:r('queue')). -type(qpids() :: [pid()]). @@ -59,8 +61,9 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(not_found_or_absent() :: 'not_found' | - {'absent', rabbit_types:amqqueue()}). +-type(absent_reason() :: 'nodedown' | 'crashed'). +-type(not_found_or_absent() :: + 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}). -spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). -spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok'). @@ -72,8 +75,9 @@ -spec(declare/6 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) - -> {'new' | 'existing' | 'absent' | 'owner_died', - rabbit_types:amqqueue()} | rabbit_types:channel_exit()). + -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | + {'absent', rabbit_types:amqqueue(), absent_reason()} | + rabbit_types:channel_exit()). -spec(internal_declare/1 :: (rabbit_types:amqqueue()) -> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} | @@ -137,6 +141,8 @@ -> qlen() | rabbit_types:error('in_use') | rabbit_types:error('not_empty')). +-spec(delete_crashed/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_crashed_internal/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> @@ -274,10 +280,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of - not_found -> ok = store_queue(Q), - B = add_default_binding(Q), - {new, fun () -> B(), Q end}; - {absent, _Q} = R -> R + not_found -> ok = store_queue(Q), + B = add_default_binding(Q), + {new, fun () -> B(), Q end}; + {absent, Q, _Reason} = R -> R end. update(Name, Fun) -> @@ -349,7 +355,7 @@ not_found_or_absent(Name) -> %% rabbit_queue and not found anything case mnesia:read({rabbit_durable_queue, Name}) of [] -> not_found; - [Q] -> {absent, Q} %% Q exists on stopped node + [Q] -> {absent, Q, nodedown} %% Q exists on stopped node end. not_found_or_absent_dirty(Name) -> @@ -358,7 +364,7 @@ not_found_or_absent_dirty(Name) -> %% and only affect the error kind. case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of {error, not_found} -> not_found; - {ok, Q} -> {absent, Q} + {ok, Q} -> {absent, Q, nodedown} end. with(Name, F, E) -> @@ -372,8 +378,11 @@ with(Name, F, E) -> %% the retry loop. rabbit_misc:with_exit_handler( fun () -> false = rabbit_misc:is_process_alive(QPid), - timer:sleep(25), - with(Name, F, E) + case crashed_or_recovering(Q) of + crashed -> E({absent, Q, crashed}); + recovering -> timer:sleep(25), + with(Name, F, E) + end end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -382,10 +391,22 @@ with(Name, F, E) -> with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> - with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); - ({absent, Q}) -> rabbit_misc:absent(Q) + with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). +%% TODO we could still be wrong here if we happen to call in the +%% middle of a crash-failover. We could try to figure out whether +%% that's happening by looking for the supervisor - but we'd need some +%% additional book keeping to know what it is... +crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> + case lists:member(node(QPid), [node() | nodes()]) of + true -> crashed; + false -> recovering + end; +crashed_or_recovering(_Q) -> + recovering. + assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> @@ -546,6 +567,14 @@ delete_immediately(QPids) -> delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate:call(QPid, {delete, IfUnused, IfEmpty}). +delete_crashed(#amqqueue{ pid = QPid } = Q) -> + rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]). + +delete_crashed_internal(#amqqueue{ name = QName }) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQ:delete_crashed(QName), + ok = internal_delete(QName). + purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 098f5f4342..310b82207d 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -85,6 +85,10 @@ %% content. -callback delete_and_terminate(any(), state()) -> state(). +%% Called to clean up after a crashed queue. In this case we don't +%% have a process and thus a state(), we are just removing on-disk data. +-callback delete_crashed(rabbit_amqqueue:name()) -> 'ok'. + %% Remove all 'fetchable' messages from the queue, i.e. all messages %% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index d887f26a45..12082af8ec 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -363,7 +363,7 @@ not_found_or_absent_errs(Names) -> absent_errs_only(Names) -> Errs = [E || Name <- Names, - {absent, _Q} = E <- [not_found_or_absent(Name)]], + {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]], rabbit_misc:const(case Errs of [] -> ok; _ -> {error, {resources_missing, Errs}} @@ -376,8 +376,8 @@ not_found_or_absent(#resource{kind = exchange} = Name) -> {not_found, Name}; not_found_or_absent(#resource{kind = queue} = Name) -> case rabbit_amqqueue:not_found_or_absent(Name) of - not_found -> {not_found, Name}; - {absent, _Q} = R -> R + not_found -> {not_found, Name}; + {absent, _Q, _Reason} = R -> R end. contains(Table, MatchHead) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e5a90410e1..fc43389810 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1189,16 +1189,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% must have been created between the stat and the %% declare. Loop around again. handle_method(Declare, none, State); - {absent, Q} -> - rabbit_misc:absent(Q); + {absent, Q, Reason} -> + rabbit_misc:absent(Q, Reason); {owner_died, _Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. return_queue_declare_ok(QueueName, NoWait, 0, 0, State) end; - {error, {absent, Q}} -> - rabbit_misc:absent(Q) + {error, {absent, Q, Reason}} -> + rabbit_misc:absent(Q, Reason) end; handle_method(#'queue.declare'{queue = QueueNameBin, @@ -1227,8 +1227,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin, rabbit_amqqueue:check_exclusive_access(Q, ConnPid), rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end, - fun (not_found) -> {ok, 0}; - ({absent, Q}) -> rabbit_misc:absent(Q) + fun (not_found) -> {ok, 0}; + ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q), + {ok, 0}; + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); @@ -1477,8 +1479,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, end) of {error, {resources_missing, [{not_found, Name} | _]}} -> rabbit_misc:not_found(Name); - {error, {resources_missing, [{absent, Q} | _]}} -> - rabbit_misc:absent(Q); + {error, {resources_missing, [{absent, Q, Reason} | _]}} -> + rabbit_misc:absent(Q, Reason); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c4148bbfc5..cf3a2edbaa 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -21,7 +21,7 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1, absent/1]). +-export([not_found/1, absent/2]). -export([type_class/1, assert_args_equivalence/4]). -export([dirty_read/1]). -export([table_lookup/2, set_table_value/4]). @@ -119,7 +119,8 @@ -spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). --spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()). +-spec(absent/2 :: (rabbit_types:amqqueue(), rabbit_amqqueue:absent_reason()) + -> rabbit_types:channel_exit()). -spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), @@ -292,14 +293,18 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). -absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) -> +absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, down) -> %% The assertion of durability is mainly there because we mention %% durability in the error message. That way we will hopefully %% notice if at some future point our logic changes s.t. we get %% here with non-durable queues. protocol_error(not_found, "home node '~s' of durable ~s is down or inaccessible", - [node(QPid), rs(QueueName)]). + [node(QPid), rs(QueueName)]); + +absent(#amqqueue{name = QueueName}, crashed) -> + protocol_error(not_found, + "~s has crashed and failed to restart", [rs(QueueName)]). type_class(byte) -> int; type_class(short) -> int; diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index d71a68c5b8..fd20e926c4 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -99,7 +99,10 @@ init_declared(Q = #amqqueue{name = QueueName}) -> {new, Fun} -> Q1 = Fun(), rabbit_amqqueue_process:init_declared(new,From, Q1); - {F, _} when F =:= absent; F =:= existing -> + {absent, _, _} -> + gen_server2:reply(From, Decl), + {stop, normal, Q}; + {existing, _} -> gen_server2:reply(From, Decl), {stop, normal, Q} end diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c960fad41d..e858fb3d85 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,8 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, +-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, @@ -510,6 +511,9 @@ delete_and_terminate(_Reason, State) -> a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). +delete_crashed(QName) -> + ok = rabbit_queue_index:erase(QName). + purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index cfa3add44a..2c1e15f0b7 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -94,10 +94,10 @@ delete(VHostPath) -> [ok = Fun() || Fun <- Funs], ok. -assert_benign(ok) -> ok; -assert_benign({ok, _}) -> ok; -assert_benign({error, not_found}) -> ok; -assert_benign({error, {absent, Q}}) -> +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q, nodedown}}) -> %% We have a durable queue on a down node. Removing the mnesia %% entries here is safe. If/when the down node restarts, it will %% clear out the on-disk storage of the queue. |
