diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-06-09 17:48:30 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-06-09 17:48:30 +0100 |
| commit | 22880bea834370b13c83efe74760bceca9514401 (patch) | |
| tree | bfa862d3ad57fdcbd3c110054810fcccbb65d656 | |
| parent | a1b510bad714e565bb12b95b5952b1793b94dc46 (diff) | |
| download | rabbitmq-server-git-22880bea834370b13c83efe74760bceca9514401.tar.gz | |
Support updating decorators correctly when plugins start and stop.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_queue_decorator.erl | 23 |
4 files changed, 80 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b0b782577b..7a26ddec56 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,7 +30,7 @@ -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). --export([update/2, store_queue/1, policy_changed/2]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -177,6 +177,7 @@ -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(update_decorators/1 :: (name()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). @@ -311,13 +312,23 @@ store_queue(Q = #amqqueue{durable = true}) -> sync_slave_pids = [], gm_pids = [], decorators = undefined}, write), - store_queue0(Q); + store_queue_ram(Q); store_queue(Q = #amqqueue{durable = false}) -> - store_queue0(Q). + store_queue_ram(Q). -store_queue0(Q) -> +store_queue_ram(Q) -> ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). + policy_changed(Q1 = #amqqueue{decorators = Decorators1}, Q2 = #amqqueue{decorators = Decorators2}) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 350de2a8f5..2e0566b7d8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -20,7 +20,8 @@ -export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, + lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, + update_scratch/3, update_decorators/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx @@ -61,6 +62,7 @@ -spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). +-spec(list/0 :: () -> [rabbit_types:exchange()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). -spec(lookup_scratch/2 :: (name(), atom()) -> rabbit_types:ok(term()) | @@ -70,6 +72,7 @@ (name(), fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> not_found | rabbit_types:exchange()). +-spec(update_decorators/1 :: (name()) -> 'ok'). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -107,7 +110,7 @@ recover() -> end, fun (X, Tx) -> X1 = case Tx of - true -> store0(X); + true -> store_ram(X); false -> rabbit_exchange_decorator:set(X) end, callback(X1, create, map_create_tx(Tx), [X1]) @@ -185,11 +188,11 @@ map_create_tx(false) -> none. store(X = #exchange{durable = true}) -> mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined}, write), - store0(X); + store_ram(X); store(X = #exchange{durable = false}) -> - store0(X). + store_ram(X). -store0(X) -> +store_ram(X) -> X1 = rabbit_exchange_decorator:set(X), ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1), write), @@ -241,6 +244,8 @@ lookup_or_die(Name) -> {error, not_found} -> rabbit_misc:not_found(Name) end. +list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context list(VHostPath) -> @@ -285,6 +290,16 @@ update_scratch(Name, App, Fun) -> ok end). +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_exchange, Name}) of + [X] -> store_ram(X), + ok; + [] -> ok + end + end). + update(Name, Fun) -> case mnesia:wread({rabbit_exchange, Name}) of [X] -> X1 = Fun(X), diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 2f056b1bfe..900f9c32c8 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([select/2, set/1]). +-export([select/2, set/1, register/2, unregister/1]). %% This is like an exchange type except that: %% @@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. cons_if_eq(Select, Select, Item, List) -> [Item | List]; cons_if_eq(_Select, _Other, _Item, List) -> List. + +register(TypeName, ModuleName) -> + rabbit_registry:register(exchange_decorator, TypeName, ModuleName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(exchange_decorator, TypeName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +maybe_recover(X = #exchange{name = Name, + decorators = Decs}) -> + #exchange{decorators = Decs1} = set(X), + Old = lists:sort(select(all, Decs)), + New = lists:sort(select(all, Decs1)), + case New of + Old -> ok; + _ -> %% TODO create a tx here for non-federation decorators + [M:create(none, X) || M <- New -- Old], + rabbit_exchange:update_decorators(Name) + end. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 6205e2dc18..adfe0c7fae 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -2,7 +2,7 @@ -include("rabbit.hrl"). --export([select/1, set/1]). +-export([select/1, set/1, register/2, unregister/1]). %%---------------------------------------------------------------------------- @@ -41,3 +41,24 @@ select(Modules) -> set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. + +register(TypeName, ModuleName) -> + rabbit_registry:register(queue_decorator, TypeName, ModuleName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(queue_decorator, TypeName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +maybe_recover(Q = #amqqueue{name = Name, + decorators = Decs}) -> + #amqqueue{decorators = Decs1} = set(Q), + Old = lists:sort(select(Decs)), + New = lists:sort(select(Decs1)), + case New of + Old -> ok; + _ -> [M:startup(Q) || M <- New -- Old], + rabbit_amqqueue:update_decorators(Name) + end. |
