summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-09 13:36:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-09 13:36:06 +0100
commit5a87bdd60e32f8cdc62796f51f98f452e8f7ad99 (patch)
treebf57fa0e0d9c047a8d0ec9ed43bd7762ee4dcdc2 /src
parentc979b3ddf5edb70cb9c2fd849c7af2edee655cdd (diff)
downloadrabbitmq-server-git-5a87bdd60e32f8cdc62796f51f98f452e8f7ad99.tar.gz
Introduce the idea that queues can be absent for a reason. The traditional absent reason is 'nodedown' and we have a new reason, 'crashed', for when crash recovery has failed and the supervisor has given up. An absent crashed queue is nearly the same as an absent nodedown queue, but we allow for deleting it since it can't be recovered by bringing a node back up.
Currently absent crashed queues are not handled properly by mgmt (they appear to still be there); we might defer that to bug 26151.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl61
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_binding.erl6
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_prequeue.erl5
-rw-r--r--src/rabbit_variable_queue.erl6
-rw-r--r--src/rabbit_vhost.erl8
8 files changed, 84 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0ad6af51e4..4906b252b6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,7 +17,9 @@
-module(rabbit_amqqueue).
-export([recover/0, stop/0, start/1, declare/5, declare/6,
- delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
+ delete_immediately/1, delete/3,
+ delete_crashed/1, delete_crashed_internal/1,
+ purge/1, forget_all_durable/1]).
-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -49,7 +51,7 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, absent_reason/0]).
-type(name() :: rabbit_types:r('queue')).
-type(qpids() :: [pid()]).
@@ -59,8 +61,9 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(not_found_or_absent() :: 'not_found' |
- {'absent', rabbit_types:amqqueue()}).
+-type(absent_reason() :: 'nodedown' | 'crashed').
+-type(not_found_or_absent() ::
+ 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}).
-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok').
@@ -72,8 +75,9 @@
-spec(declare/6 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node())
- -> {'new' | 'existing' | 'absent' | 'owner_died',
- rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
+ -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} |
+ {'absent', rabbit_types:amqqueue(), absent_reason()} |
+ rabbit_types:channel_exit()).
-spec(internal_declare/1 ::
(rabbit_types:amqqueue())
-> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} |
@@ -137,6 +141,8 @@
-> qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
+-spec(delete_crashed/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_crashed_internal/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
@@ -274,10 +280,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
internal_declare(Q = #amqqueue{name = QueueName}) ->
case not_found_or_absent(QueueName) of
- not_found -> ok = store_queue(Q),
- B = add_default_binding(Q),
- {new, fun () -> B(), Q end};
- {absent, _Q} = R -> R
+ not_found -> ok = store_queue(Q),
+ B = add_default_binding(Q),
+ {new, fun () -> B(), Q end};
+ {absent, Q, _Reason} = R -> R
end.
update(Name, Fun) ->
@@ -349,7 +355,7 @@ not_found_or_absent(Name) ->
%% rabbit_queue and not found anything
case mnesia:read({rabbit_durable_queue, Name}) of
[] -> not_found;
- [Q] -> {absent, Q} %% Q exists on stopped node
+ [Q] -> {absent, Q, nodedown} %% Q exists on stopped node
end.
not_found_or_absent_dirty(Name) ->
@@ -358,7 +364,7 @@ not_found_or_absent_dirty(Name) ->
%% and only affect the error kind.
case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
{error, not_found} -> not_found;
- {ok, Q} -> {absent, Q}
+ {ok, Q} -> {absent, Q, nodedown}
end.
with(Name, F, E) ->
@@ -372,8 +378,11 @@ with(Name, F, E) ->
%% the retry loop.
rabbit_misc:with_exit_handler(
fun () -> false = rabbit_misc:is_process_alive(QPid),
- timer:sleep(25),
- with(Name, F, E)
+ case crashed_or_recovering(Q) of
+ crashed -> E({absent, Q, crashed});
+ recovering -> timer:sleep(25),
+ with(Name, F, E)
+ end
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -382,10 +391,22 @@ with(Name, F, E) ->
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
- with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
- ({absent, Q}) -> rabbit_misc:absent(Q)
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end).
+%% TODO we could still be wrong here if we happen to call in the
+%% middle of a crash-failover. We could try to figure out whether
+%% that's happening by looking for the supervisor - but we'd need some
+%% additional book keeping to know what it is...
+crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) ->
+ case lists:member(node(QPid), [node() | nodes()]) of
+ true -> crashed;
+ false -> recovering
+ end;
+crashed_or_recovering(_Q) ->
+ recovering.
+
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
@@ -546,6 +567,14 @@ delete_immediately(QPids) ->
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate:call(QPid, {delete, IfUnused, IfEmpty}).
+delete_crashed(#amqqueue{ pid = QPid } = Q) ->
+ rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]).
+
+delete_crashed_internal(#amqqueue{ name = QName }) ->
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQ:delete_crashed(QName),
+ ok = internal_delete(QName).
+
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 098f5f4342..310b82207d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -85,6 +85,10 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
+%% Called to clean up after a crashed queue. In this case we don't
+%% have a process and thus a state(), we are just removing on-disk data.
+-callback delete_crashed(rabbit_amqqueue:name()) -> 'ok'.
+
%% Remove all 'fetchable' messages from the queue, i.e. all messages
%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index d887f26a45..12082af8ec 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -363,7 +363,7 @@ not_found_or_absent_errs(Names) ->
absent_errs_only(Names) ->
Errs = [E || Name <- Names,
- {absent, _Q} = E <- [not_found_or_absent(Name)]],
+ {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]],
rabbit_misc:const(case Errs of
[] -> ok;
_ -> {error, {resources_missing, Errs}}
@@ -376,8 +376,8 @@ not_found_or_absent(#resource{kind = exchange} = Name) ->
{not_found, Name};
not_found_or_absent(#resource{kind = queue} = Name) ->
case rabbit_amqqueue:not_found_or_absent(Name) of
- not_found -> {not_found, Name};
- {absent, _Q} = R -> R
+ not_found -> {not_found, Name};
+ {absent, _Q, _Reason} = R -> R
end.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e5a90410e1..fc43389810 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1189,16 +1189,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% must have been created between the stat and the
%% declare. Loop around again.
handle_method(Declare, none, State);
- {absent, Q} ->
- rabbit_misc:absent(Q);
+ {absent, Q, Reason} ->
+ rabbit_misc:absent(Q, Reason);
{owner_died, _Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
end;
- {error, {absent, Q}} ->
- rabbit_misc:absent(Q)
+ {error, {absent, Q, Reason}} ->
+ rabbit_misc:absent(Q, Reason)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
@@ -1227,8 +1227,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
end,
- fun (not_found) -> {ok, 0};
- ({absent, Q}) -> rabbit_misc:absent(Q)
+ fun (not_found) -> {ok, 0};
+ ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q),
+ {ok, 0};
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
@@ -1477,8 +1479,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end) of
{error, {resources_missing, [{not_found, Name} | _]}} ->
rabbit_misc:not_found(Name);
- {error, {resources_missing, [{absent, Q} | _]}} ->
- rabbit_misc:absent(Q);
+ {error, {resources_missing, [{absent, Q, Reason} | _]}} ->
+ rabbit_misc:absent(Q, Reason);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c4148bbfc5..cf3a2edbaa 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, absent/1]).
+-export([not_found/1, absent/2]).
-export([type_class/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
@@ -119,7 +119,8 @@
-spec(protocol_error/1 ::
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
--spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
+-spec(absent/2 :: (rabbit_types:amqqueue(), rabbit_amqqueue:absent_reason())
+ -> rabbit_types:channel_exit()).
-spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
@@ -292,14 +293,18 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
-absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) ->
+absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, down) ->
%% The assertion of durability is mainly there because we mention
%% durability in the error message. That way we will hopefully
%% notice if at some future point our logic changes s.t. we get
%% here with non-durable queues.
protocol_error(not_found,
"home node '~s' of durable ~s is down or inaccessible",
- [node(QPid), rs(QueueName)]).
+ [node(QPid), rs(QueueName)]);
+
+absent(#amqqueue{name = QueueName}, crashed) ->
+ protocol_error(not_found,
+ "~s has crashed and failed to restart", [rs(QueueName)]).
type_class(byte) -> int;
type_class(short) -> int;
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index d71a68c5b8..fd20e926c4 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -99,7 +99,10 @@ init_declared(Q = #amqqueue{name = QueueName}) ->
{new, Fun} ->
Q1 = Fun(),
rabbit_amqqueue_process:init_declared(new,From, Q1);
- {F, _} when F =:= absent; F =:= existing ->
+ {absent, _, _} ->
+ gen_server2:reply(From, Decl),
+ {stop, normal, Q};
+ {existing, _} ->
gen_server2:reply(From, Decl),
{stop, normal, Q}
end
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c960fad41d..e858fb3d85 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,8 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
@@ -510,6 +511,9 @@ delete_and_terminate(_Reason, State) ->
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
+delete_crashed(QName) ->
+ ok = rabbit_queue_index:erase(QName).
+
purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index cfa3add44a..2c1e15f0b7 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -94,10 +94,10 @@ delete(VHostPath) ->
[ok = Fun() || Fun <- Funs],
ok.
-assert_benign(ok) -> ok;
-assert_benign({ok, _}) -> ok;
-assert_benign({error, not_found}) -> ok;
-assert_benign({error, {absent, Q}}) ->
+assert_benign(ok) -> ok;
+assert_benign({ok, _}) -> ok;
+assert_benign({error, not_found}) -> ok;
+assert_benign({error, {absent, Q, nodedown}}) ->
%% We have a durable queue on a down node. Removing the mnesia
%% entries here is safe. If/when the down node restarts, it will
%% clear out the on-disk storage of the queue.