summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-09 17:48:30 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-09 17:48:30 +0100
commit22880bea834370b13c83efe74760bceca9514401 (patch)
treebfa862d3ad57fdcbd3c110054810fcccbb65d656 /src
parenta1b510bad714e565bb12b95b5952b1793b94dc46 (diff)
downloadrabbitmq-server-git-22880bea834370b13c83efe74760bceca9514401.tar.gz
Support updating decorators correctly when plugins start and stop.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_exchange.erl25
-rw-r--r--src/rabbit_exchange_decorator.erl24
-rw-r--r--src/rabbit_queue_decorator.erl23
4 files changed, 80 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b0b782577b..7a26ddec56 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,7 +30,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
--export([update/2, store_queue/1, policy_changed/2]).
+-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -177,6 +177,7 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(update_decorators/1 :: (name()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
@@ -311,13 +312,23 @@ store_queue(Q = #amqqueue{durable = true}) ->
sync_slave_pids = [],
gm_pids = [],
decorators = undefined}, write),
- store_queue0(Q);
+ store_queue_ram(Q);
store_queue(Q = #amqqueue{durable = false}) ->
- store_queue0(Q).
+ store_queue_ram(Q).
-store_queue0(Q) ->
+store_queue_ram(Q) ->
ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q] -> store_queue_ram(Q),
+ ok;
+ [] -> ok
+ end
+ end).
+
policy_changed(Q1 = #amqqueue{decorators = Decorators1},
Q2 = #amqqueue{decorators = Decorators2}) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 350de2a8f5..2e0566b7d8 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -20,7 +20,8 @@
-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
+ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
+ update_scratch/3, update_decorators/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
@@ -61,6 +62,7 @@
-spec(lookup_or_die/1 ::
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
+-spec(list/0 :: () -> [rabbit_types:exchange()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(lookup_scratch/2 :: (name(), atom()) ->
rabbit_types:ok(term()) |
@@ -70,6 +72,7 @@
(name(),
fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
-> not_found | rabbit_types:exchange()).
+-spec(update_decorators/1 :: (name()) -> 'ok').
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -107,7 +110,7 @@ recover() ->
end,
fun (X, Tx) ->
X1 = case Tx of
- true -> store0(X);
+ true -> store_ram(X);
false -> rabbit_exchange_decorator:set(X)
end,
callback(X1, create, map_create_tx(Tx), [X1])
@@ -185,11 +188,11 @@ map_create_tx(false) -> none.
store(X = #exchange{durable = true}) ->
mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
write),
- store0(X);
+ store_ram(X);
store(X = #exchange{durable = false}) ->
- store0(X).
+ store_ram(X).
-store0(X) ->
+store_ram(X) ->
X1 = rabbit_exchange_decorator:set(X),
ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1),
write),
@@ -241,6 +244,8 @@ lookup_or_die(Name) ->
{error, not_found} -> rabbit_misc:not_found(Name)
end.
+list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
list(VHostPath) ->
@@ -285,6 +290,16 @@ update_scratch(Name, App, Fun) ->
ok
end).
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X] -> store_ram(X),
+ ok;
+ [] -> ok
+ end
+ end).
+
update(Name, Fun) ->
case mnesia:wread({rabbit_exchange, Name}) of
[X] -> X1 = Fun(X),
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 2f056b1bfe..900f9c32c8 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([select/2, set/1]).
+-export([select/2, set/1, register/2, unregister/1]).
%% This is like an exchange type except that:
%%
@@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(exchange_decorator, TypeName, ModuleName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(exchange_decorator, TypeName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+maybe_recover(X = #exchange{name = Name,
+ decorators = Decs}) ->
+ #exchange{decorators = Decs1} = set(X),
+ Old = lists:sort(select(all, Decs)),
+ New = lists:sort(select(all, Decs1)),
+ case New of
+ Old -> ok;
+ _ -> %% TODO create a tx here for non-federation decorators
+ [M:create(none, X) || M <- New -- Old],
+ rabbit_exchange:update_decorators(Name)
+ end.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 6205e2dc18..adfe0c7fae 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -2,7 +2,7 @@
-include("rabbit.hrl").
--export([select/1, set/1]).
+-export([select/1, set/1, register/2, unregister/1]).
%%----------------------------------------------------------------------------
@@ -41,3 +41,24 @@ select(Modules) ->
set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(queue_decorator, TypeName, ModuleName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(queue_decorator, TypeName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+maybe_recover(Q = #amqqueue{name = Name,
+ decorators = Decs}) ->
+ #amqqueue{decorators = Decs1} = set(Q),
+ Old = lists:sort(select(Decs)),
+ New = lists:sort(select(Decs1)),
+ case New of
+ Old -> ok;
+ _ -> [M:startup(Q) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
+ end.