diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-04-24 17:50:00 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-04-24 17:50:00 +0100 |
| commit | 37b9455d6d0a9342216bc29e47a81eb3deb121e0 (patch) | |
| tree | 55fcce41f2a6d7f57e9a5963f9165b5e680937da /src | |
| parent | 42c84b030194497f12cceb805ed1cb127375f041 (diff) | |
| download | rabbitmq-server-git-37b9455d6d0a9342216bc29e47a81eb3deb121e0.tar.gz | |
Rework event serialisation thing so that decorators get a say too. It's a bit fiddly since we have to ensure that just because a decorator wants serials, doesn't mean everything else has to get them...
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_binding.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
4 files changed, 49 insertions, 30 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb44797e4d..f0ea514dcf 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -173,13 +173,11 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), - ok = rabbit_exchange:callback( - Src, add_binding, [transaction, Src, B]), + x_callback(transaction, Src, add_binding, B), Serial = rabbit_exchange:serial(Src), fun () -> - ok = rabbit_exchange:callback( - Src, add_binding, [Serial, Src, B]), - ok = rabbit_event:notify(binding_created, info(B)) + x_callback(Serial, Src, add_binding, B), + ok = rabbit_event:notify(binding_created, info(B)) end; false -> rabbit_misc:const({error, binding_not_found}) end. @@ -487,4 +485,5 @@ process_deletions(Deletions) -> del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. -x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). +x_callback(Serial, X, F, Bs) -> + ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 6c82bc7b89..7291da8dcb 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, callback/3, declare/6, +-export([recover/0, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/1, update_scratch/2, info_keys/0, info/1, info/2, info_all/1, info_all/2, @@ -37,7 +37,9 @@ -type(fun_name() :: atom()). -spec(recover/0 :: () -> [name()]). --spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). +-spec(callback/4:: + (rabbit_types:exchange(), fun_name(), non_neg_integer() | atom(), + [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -76,7 +78,8 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(serial/1 :: (rabbit_types:exchange()) -> + fun(() -> 'none' | pos_integer())). -spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined'). -endif. @@ -95,21 +98,44 @@ recover() -> true -> store(X); false -> ok end, - rabbit_exchange:callback(X, create, [map_create_tx(Tx), X]) + callback(X, create, map_create_tx(Tx), [X]) end, rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. -callback(#exchange{type = XType}, Fun, Args) -> - callback(type_to_module(XType), Fun, Args); - -callback(Module, Fun, Args) -> +callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> %% TODO cache this? - %% TODO what about serialising events? %% TODO what about sharing the scratch space? - Decorators = rabbit_registry:lookup_all(exchange_decorator), - [ok = apply(M, Fun, Args) || {_, M} <- Decorators], - apply(Module, Fun, Args). + Serial = fun (Bool) -> + case Serial0 of + _ when is_atom(Serial0) -> Serial0; + _ -> Serial0(Bool) + end + end, + [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) + || M <- decorators()], + Module = type_to_module(XType), + apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). + +serialise_events(X = #exchange{type = Type}) -> + case [Serialise || M <- decorators(), + Serialise <- [M:serialise_events(X)], + Serialise == true] of + [] -> (type_to_module(Type)):serialise_events(); + _ -> true + end. + +serial(#exchange{name = XName} = X) -> + Serial = case serialise_events(X) of + true -> next_serial(XName); + false -> none + end, + fun (true) -> Serial; + (false) -> none + end. + +decorators() -> + [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, @@ -137,7 +163,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = callback(XT, create, [map_create_tx(Tx), Exchange]), + ok = callback(X, create, map_create_tx(Tx), [Exchange]), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -149,9 +175,9 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> map_create_tx(true) -> transaction; map_create_tx(false) -> none. -store(X = #exchange{name = Name, type = Type}) -> +store(X = #exchange{name = Name}) -> ok = mnesia:write(rabbit_exchange, X, write), - case (type_to_module(Type)):serialise_events() of + case serialise_events(X) of true -> S = #exchange_serial{name = Name, next = 1}, ok = mnesia:write(rabbit_exchange_serial, S, write); false -> ok @@ -349,12 +375,6 @@ unconditional_delete(X = #exchange{name = XName}) -> Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. -serial(#exchange{name = XName, type = Type}) -> - case (type_to_module(Type)):serialise_events() of - true -> next_serial(XName); - false -> none - end. - next_serial(XName) -> [#exchange_serial{next = Serial}] = mnesia:read(rabbit_exchange_serial, XName, write), diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 362ec30964..4fa87485bb 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -26,7 +26,7 @@ behaviour_info(callbacks) -> %% delivered to an individual exchange can be serialised? (they %% might still be delivered out of order, but there'll be a %% serial number). - {serialise_events, 0}, + {serialise_events, 1}, {route, 2}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 35bf1012ac..a502fa95c3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -617,8 +617,8 @@ test_topic_matching() -> exchange_op_callback(X, Fun, Args) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end), - rabbit_exchange:callback(X, Fun, [none, X] ++ Args). + fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end), + rabbit_exchange:callback(X, Fun, none, [X] ++ Args). test_topic_expect_match(X, List) -> lists:foreach( |
