summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl61
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_binding.erl6
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_prequeue.erl5
-rw-r--r--src/rabbit_variable_queue.erl6
-rw-r--r--src/rabbit_vhost.erl8
8 files changed, 84 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0ad6af51e4..4906b252b6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,7 +17,9 @@
-module(rabbit_amqqueue).
-export([recover/0, stop/0, start/1, declare/5, declare/6,
- delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
+ delete_immediately/1, delete/3,
+ delete_crashed/1, delete_crashed_internal/1,
+ purge/1, forget_all_durable/1]).
-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -49,7 +51,7 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, absent_reason/0]).
-type(name() :: rabbit_types:r('queue')).
-type(qpids() :: [pid()]).
@@ -59,8 +61,9 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(not_found_or_absent() :: 'not_found' |
- {'absent', rabbit_types:amqqueue()}).
+-type(absent_reason() :: 'nodedown' | 'crashed').
+-type(not_found_or_absent() ::
+ 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}).
-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
-spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok').
@@ -72,8 +75,9 @@
-spec(declare/6 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node())
- -> {'new' | 'existing' | 'absent' | 'owner_died',
- rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
+ -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} |
+ {'absent', rabbit_types:amqqueue(), absent_reason()} |
+ rabbit_types:channel_exit()).
-spec(internal_declare/1 ::
(rabbit_types:amqqueue())
-> {'new', rabbit_misc:thunk(rabbit_types:amqqueue())} |
@@ -137,6 +141,8 @@
-> qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
+-spec(delete_crashed/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_crashed_internal/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
@@ -274,10 +280,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
internal_declare(Q = #amqqueue{name = QueueName}) ->
case not_found_or_absent(QueueName) of
- not_found -> ok = store_queue(Q),
- B = add_default_binding(Q),
- {new, fun () -> B(), Q end};
- {absent, _Q} = R -> R
+ not_found -> ok = store_queue(Q),
+ B = add_default_binding(Q),
+ {new, fun () -> B(), Q end};
+ {absent, Q, _Reason} = R -> R
end.
update(Name, Fun) ->
@@ -349,7 +355,7 @@ not_found_or_absent(Name) ->
%% rabbit_queue and not found anything
case mnesia:read({rabbit_durable_queue, Name}) of
[] -> not_found;
- [Q] -> {absent, Q} %% Q exists on stopped node
+ [Q] -> {absent, Q, nodedown} %% Q exists on stopped node
end.
not_found_or_absent_dirty(Name) ->
@@ -358,7 +364,7 @@ not_found_or_absent_dirty(Name) ->
%% and only affect the error kind.
case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
{error, not_found} -> not_found;
- {ok, Q} -> {absent, Q}
+ {ok, Q} -> {absent, Q, nodedown}
end.
with(Name, F, E) ->
@@ -372,8 +378,11 @@ with(Name, F, E) ->
%% the retry loop.
rabbit_misc:with_exit_handler(
fun () -> false = rabbit_misc:is_process_alive(QPid),
- timer:sleep(25),
- with(Name, F, E)
+ case crashed_or_recovering(Q) of
+ crashed -> E({absent, Q, crashed});
+ recovering -> timer:sleep(25),
+ with(Name, F, E)
+ end
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -382,10 +391,22 @@ with(Name, F, E) ->
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
- with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
- ({absent, Q}) -> rabbit_misc:absent(Q)
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end).
+%% TODO we could still be wrong here if we happen to call in the
+%% middle of a crash-failover. We could try to figure out whether
+%% that's happening by looking for the supervisor - but we'd need some
+%% additional book keeping to know what it is...
+crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) ->
+ case lists:member(node(QPid), [node() | nodes()]) of
+ true -> crashed;
+ false -> recovering
+ end;
+crashed_or_recovering(_Q) ->
+ recovering.
+
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
@@ -546,6 +567,14 @@ delete_immediately(QPids) ->
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate:call(QPid, {delete, IfUnused, IfEmpty}).
+delete_crashed(#amqqueue{ pid = QPid } = Q) ->
+ rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]).
+
+delete_crashed_internal(#amqqueue{ name = QName }) ->
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQ:delete_crashed(QName),
+ ok = internal_delete(QName).
+
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 098f5f4342..310b82207d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -85,6 +85,10 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
+%% Called to clean up after a crashed queue. In this case we don't
+%% have a process and thus a state(), we are just removing on-disk data.
+-callback delete_crashed(rabbit_amqqueue:name()) -> 'ok'.
+
%% Remove all 'fetchable' messages from the queue, i.e. all messages
%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index d887f26a45..12082af8ec 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -363,7 +363,7 @@ not_found_or_absent_errs(Names) ->
absent_errs_only(Names) ->
Errs = [E || Name <- Names,
- {absent, _Q} = E <- [not_found_or_absent(Name)]],
+ {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]],
rabbit_misc:const(case Errs of
[] -> ok;
_ -> {error, {resources_missing, Errs}}
@@ -376,8 +376,8 @@ not_found_or_absent(#resource{kind = exchange} = Name) ->
{not_found, Name};
not_found_or_absent(#resource{kind = queue} = Name) ->
case rabbit_amqqueue:not_found_or_absent(Name) of
- not_found -> {not_found, Name};
- {absent, _Q} = R -> R
+ not_found -> {not_found, Name};
+ {absent, _Q, _Reason} = R -> R
end.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e5a90410e1..fc43389810 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1189,16 +1189,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% must have been created between the stat and the
%% declare. Loop around again.
handle_method(Declare, none, State);
- {absent, Q} ->
- rabbit_misc:absent(Q);
+ {absent, Q, Reason} ->
+ rabbit_misc:absent(Q, Reason);
{owner_died, _Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
end;
- {error, {absent, Q}} ->
- rabbit_misc:absent(Q)
+ {error, {absent, Q, Reason}} ->
+ rabbit_misc:absent(Q, Reason)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
@@ -1227,8 +1227,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
end,
- fun (not_found) -> {ok, 0};
- ({absent, Q}) -> rabbit_misc:absent(Q)
+ fun (not_found) -> {ok, 0};
+ ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q),
+ {ok, 0};
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
@@ -1477,8 +1479,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end) of
{error, {resources_missing, [{not_found, Name} | _]}} ->
rabbit_misc:not_found(Name);
- {error, {resources_missing, [{absent, Q} | _]}} ->
- rabbit_misc:absent(Q);
+ {error, {resources_missing, [{absent, Q, Reason} | _]}} ->
+ rabbit_misc:absent(Q, Reason);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c4148bbfc5..cf3a2edbaa 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, absent/1]).
+-export([not_found/1, absent/2]).
-export([type_class/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
@@ -119,7 +119,8 @@
-spec(protocol_error/1 ::
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
--spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
+-spec(absent/2 :: (rabbit_types:amqqueue(), rabbit_amqqueue:absent_reason())
+ -> rabbit_types:channel_exit()).
-spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
@@ -292,14 +293,18 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
-absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) ->
+absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, down) ->
%% The assertion of durability is mainly there because we mention
%% durability in the error message. That way we will hopefully
%% notice if at some future point our logic changes s.t. we get
%% here with non-durable queues.
protocol_error(not_found,
"home node '~s' of durable ~s is down or inaccessible",
- [node(QPid), rs(QueueName)]).
+ [node(QPid), rs(QueueName)]);
+
+absent(#amqqueue{name = QueueName}, crashed) ->
+ protocol_error(not_found,
+ "~s has crashed and failed to restart", [rs(QueueName)]).
type_class(byte) -> int;
type_class(short) -> int;
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index d71a68c5b8..fd20e926c4 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -99,7 +99,10 @@ init_declared(Q = #amqqueue{name = QueueName}) ->
{new, Fun} ->
Q1 = Fun(),
rabbit_amqqueue_process:init_declared(new,From, Q1);
- {F, _} when F =:= absent; F =:= existing ->
+ {absent, _, _} ->
+ gen_server2:reply(From, Decl),
+ {stop, normal, Q};
+ {existing, _} ->
gen_server2:reply(From, Decl),
{stop, normal, Q}
end
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c960fad41d..e858fb3d85 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,8 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
@@ -510,6 +511,9 @@ delete_and_terminate(_Reason, State) ->
a(State2 #vqstate { index_state = IndexState1,
msg_store_clients = undefined }).
+delete_crashed(QName) ->
+ ok = rabbit_queue_index:erase(QName).
+
purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index cfa3add44a..2c1e15f0b7 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -94,10 +94,10 @@ delete(VHostPath) ->
[ok = Fun() || Fun <- Funs],
ok.
-assert_benign(ok) -> ok;
-assert_benign({ok, _}) -> ok;
-assert_benign({error, not_found}) -> ok;
-assert_benign({error, {absent, Q}}) ->
+assert_benign(ok) -> ok;
+assert_benign({ok, _}) -> ok;
+assert_benign({error, not_found}) -> ok;
+assert_benign({error, {absent, Q, nodedown}}) ->
%% We have a durable queue on a down node. Removing the mnesia
%% entries here is safe. If/when the down node restarts, it will
%% clear out the on-disk storage of the queue.