summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-04-24 17:50:00 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-04-24 17:50:00 +0100
commit37b9455d6d0a9342216bc29e47a81eb3deb121e0 (patch)
tree55fcce41f2a6d7f57e9a5963f9165b5e680937da /src
parent42c84b030194497f12cceb805ed1cb127375f041 (diff)
downloadrabbitmq-server-git-37b9455d6d0a9342216bc29e47a81eb3deb121e0.tar.gz
Rework event serialisation thing so that decorators get a say too. It's a bit fiddly since we have to ensure that just because a decorator wants serials, doesn't mean everything else has to get them...
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_binding.erl11
-rw-r--r--src/rabbit_exchange.erl62
-rw-r--r--src/rabbit_exchange_decorator.erl2
-rw-r--r--src/rabbit_tests.erl4
4 files changed, 49 insertions, 30 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb44797e4d..f0ea514dcf 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -173,13 +173,11 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- ok = rabbit_exchange:callback(
- Src, add_binding, [transaction, Src, B]),
+ x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
fun () ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Serial, Src, B]),
- ok = rabbit_event:notify(binding_created, info(B))
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
false -> rabbit_misc:const({error, binding_not_found})
end.
@@ -487,4 +485,5 @@ process_deletions(Deletions) ->
del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
-x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).
+x_callback(Serial, X, F, Bs) ->
+ ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 6c82bc7b89..7291da8dcb 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, callback/3, declare/6,
+-export([recover/0, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1, update_scratch/2,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
@@ -37,7 +37,9 @@
-type(fun_name() :: atom()).
-spec(recover/0 :: () -> [name()]).
--spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
+-spec(callback/4::
+ (rabbit_types:exchange(), fun_name(), non_neg_integer() | atom(),
+ [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -76,7 +78,8 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(serial/1 :: (rabbit_types:exchange()) ->
+ fun(() -> 'none' | pos_integer())).
-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
-endif.
@@ -95,21 +98,44 @@ recover() ->
true -> store(X);
false -> ok
end,
- rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
+ callback(X, create, map_create_tx(Tx), [X])
end,
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
-callback(#exchange{type = XType}, Fun, Args) ->
- callback(type_to_module(XType), Fun, Args);
-
-callback(Module, Fun, Args) ->
+callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
%% TODO cache this?
- %% TODO what about serialising events?
%% TODO what about sharing the scratch space?
- Decorators = rabbit_registry:lookup_all(exchange_decorator),
- [ok = apply(M, Fun, Args) || {_, M} <- Decorators],
- apply(Module, Fun, Args).
+ Serial = fun (Bool) ->
+ case Serial0 of
+ _ when is_atom(Serial0) -> Serial0;
+ _ -> Serial0(Bool)
+ end
+ end,
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
+ || M <- decorators()],
+ Module = type_to_module(XType),
+ apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
+
+serialise_events(X = #exchange{type = Type}) ->
+ case [Serialise || M <- decorators(),
+ Serialise <- [M:serialise_events(X)],
+ Serialise == true] of
+ [] -> (type_to_module(Type)):serialise_events();
+ _ -> true
+ end.
+
+serial(#exchange{name = XName} = X) ->
+ Serial = case serialise_events(X) of
+ true -> next_serial(XName);
+ false -> none
+ end,
+ fun (true) -> Serial;
+ (false) -> none
+ end.
+
+decorators() ->
+ [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = #exchange{name = XName,
@@ -137,7 +163,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = callback(XT, create, [map_create_tx(Tx), Exchange]),
+ ok = callback(X, create, map_create_tx(Tx), [Exchange]),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -149,9 +175,9 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X = #exchange{name = Name, type = Type}) ->
+store(X = #exchange{name = Name}) ->
ok = mnesia:write(rabbit_exchange, X, write),
- case (type_to_module(Type)):serialise_events() of
+ case serialise_events(X) of
true -> S = #exchange_serial{name = Name, next = 1},
ok = mnesia:write(rabbit_exchange_serial, S, write);
false -> ok
@@ -349,12 +375,6 @@ unconditional_delete(X = #exchange{name = XName}) ->
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-serial(#exchange{name = XName, type = Type}) ->
- case (type_to_module(Type)):serialise_events() of
- true -> next_serial(XName);
- false -> none
- end.
-
next_serial(XName) ->
[#exchange_serial{next = Serial}] =
mnesia:read(rabbit_exchange_serial, XName, write),
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 362ec30964..4fa87485bb 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -26,7 +26,7 @@ behaviour_info(callbacks) ->
%% delivered to an individual exchange can be serialised? (they
%% might still be delivered out of order, but there'll be a
%% serial number).
- {serialise_events, 0},
+ {serialise_events, 1},
{route, 2},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 35bf1012ac..a502fa95c3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -617,8 +617,8 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, Args) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end),
- rabbit_exchange:callback(X, Fun, [none, X] ++ Args).
+ fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end),
+ rabbit_exchange:callback(X, Fun, none, [X] ++ Args).
test_topic_expect_match(X, List) ->
lists:foreach(