summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-15 17:14:39 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-15 17:14:39 +0100
commit3b12cf2d9698515d627d4951ab5527260ca65731 (patch)
treedb02a0d12bac619e6f896aad3a99864eed9ba15c /src
parent9f7cb18cb4c5484ad8fba98868db000a3c9d0185 (diff)
downloadrabbitmq-server-git-3b12cf2d9698515d627d4951ab5527260ca65731.tar.gz
Two modes for delete, one for when the server is running and we need to be quick going via the reverse route, and another when it is down and we need to clean up durable routes. Not entirely happy with the elegance of this but it gets the tests to pass.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_binding.erl58
-rw-r--r--src/rabbit_exchange.erl27
3 files changed, 53 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1aba7ecbd1..e45e026e4c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -612,7 +612,7 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-internal_delete1(QueueName) ->
+internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
%% disk log
@@ -622,7 +622,7 @@ internal_delete1(QueueName) ->
end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
- rabbit_binding:remove_for_destination(QueueName).
+ rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -632,7 +632,7 @@ internal_delete(QueueName) ->
{[], []} ->
rabbit_misc:const({error, not_found});
_ ->
- Deletions = internal_delete1(QueueName),
+ Deletions = internal_delete1(QueueName, false),
T = rabbit_binding:process_deletions(Deletions),
fun() ->
ok = T(),
@@ -651,7 +651,7 @@ forget_all_durable(Node) ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
[rabbit_binding:process_deletions(
- internal_delete1(Name)) ||
+ internal_delete1(Name, true)) ||
#amqqueue{name = Name, pid = Pid} = Q <- Qs,
node(Pid) =:= Node,
rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7a095e068b..d887f26a45 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -25,7 +25,7 @@
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
- remove_for_destination/1, remove_transient_for_destination/1]).
+ remove_for_destination/2, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
@@ -78,8 +78,8 @@
-> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
--spec(remove_for_destination/1 ::
- (rabbit_types:binding_destination()) -> deletions()).
+-spec(remove_for_destination/2 ::
+ (rabbit_types:binding_destination(), boolean()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')).
@@ -215,7 +215,8 @@ remove(Binding, InnerFun) ->
remove(Src, Dst, B) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
- Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
+ Deletions = maybe_auto_delete(
+ B#binding.source, [B], new_deletions(), false),
process_deletions(Deletions).
list(VHostPath) ->
@@ -298,11 +299,11 @@ remove_for_source(SrcName) ->
mnesia:match_object(rabbit_route, Match, write) ++
mnesia:match_object(rabbit_semi_durable_route, Match, write))).
-remove_for_destination(DstName) ->
- remove_for_destination(DstName, fun remove_routes/1).
+remove_for_destination(DstName, OnlyDurable) ->
+ remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
remove_transient_for_destination(DstName) ->
- remove_for_destination(DstName, fun remove_transient_routes/1).
+ remove_for_destination(DstName, false, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -428,36 +429,47 @@ remove_transient_routes(Routes) ->
R#route.binding
end || R <- Routes].
-remove_for_destination(DstName, Fun) ->
+remove_for_destination(DstName, OnlyDurable, Fun) ->
lock_route_tables(),
- Match = reverse_route(
- #route{binding = #binding{destination = DstName, _ = '_'}}),
- Routes = [reverse_route(R) || R <- mnesia:match_object(
- rabbit_reverse_route, Match, write)],
+ MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}},
+ MatchRev = reverse_route(MatchFwd),
+ Routes = case OnlyDurable of
+ false -> [reverse_route(R) ||
+ R <- mnesia:match_object(
+ rabbit_reverse_route, MatchRev, write)];
+ true -> lists:usort(
+ mnesia:match_object(
+ rabbit_durable_route, MatchFwd, write) ++
+ mnesia:match_object(
+ rabbit_semi_durable_route, MatchFwd, write))
+ end,
Bindings = Fun(Routes),
- group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
- lists:keysort(#binding.source, Bindings)).
+ group_bindings_fold(fun maybe_auto_delete/4, new_deletions(),
+ lists:keysort(#binding.source, Bindings), OnlyDurable).
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
-group_bindings_fold(_Fun, Acc, []) ->
+group_bindings_fold(_Fun, Acc, [], _OnlyDurable) ->
Acc;
-group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) ->
- group_bindings_fold(Fun, SrcName, Acc, Bs, [B]).
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs],
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable).
group_bindings_fold(
- Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) ->
- group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]);
-group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings,
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) ->
%% Either Removed is [], or its head has a non-matching SrcName.
- group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed,
+ OnlyDurable).
-maybe_auto_delete(XName, Bindings, Deletions) ->
+maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case mnesia:read({rabbit_exchange, XName}) of
[] -> {{undefined, not_deleted, Bindings}, Deletions};
- [X] -> case rabbit_exchange:maybe_auto_delete(X) of
+ [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of
not_deleted ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, Deletions2} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4d4a2a5871..685c311f4a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -24,7 +24,7 @@
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
+-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -86,8 +86,8 @@
-spec(validate_binding/2 ::
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
--spec(maybe_auto_delete/1::
- (rabbit_types:exchange())
+-spec(maybe_auto_delete/2::
+ (rabbit_types:exchange(), boolean())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-spec(serial/1 :: (rabbit_types:exchange()) ->
fun((boolean()) -> 'none' | pos_integer())).
@@ -400,13 +400,13 @@ call_with_exchange(XName, Fun) ->
delete(XName, IfUnused) ->
Fun = case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
+ true -> fun conditional_delete/2;
+ false -> fun unconditional_delete/2
end,
call_with_exchange(
XName,
fun (X) ->
- case Fun(X) of
+ case Fun(X, false) of
{deleted, X, Bs, Deletions} ->
rabbit_binding:process_deletions(
rabbit_binding:add_deletion(
@@ -420,21 +420,21 @@ validate_binding(X = #exchange{type = XType}, Binding) ->
Module = type_to_module(XType),
Module:validate_binding(X, Binding).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
+maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) ->
not_deleted;
-maybe_auto_delete(#exchange{auto_delete = true} = X) ->
- case conditional_delete(X) of
+maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) ->
+ case conditional_delete(X, OnlyDurable) of
{error, in_use} -> not_deleted;
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-conditional_delete(X = #exchange{name = XName}) ->
+conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
case rabbit_binding:has_for_source(XName) of
- false -> unconditional_delete(X);
+ false -> unconditional_delete(X, OnlyDurable);
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = XName}) ->
+unconditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
%% this 'guarded' delete prevents unnecessary writes to the mnesia
%% disk log
case mnesia:wread({rabbit_durable_exchange, XName}) of
@@ -444,7 +444,8 @@ unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
- {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(
+ XName, OnlyDurable)}.
next_serial(XName) ->
Serial = peek_serial(XName, write),