summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-18 16:15:39 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-18 16:15:39 +0000
commit9839718a3ae169c4a1c6cb19f0d5674d68cf9f63 (patch)
treeaaaba05016aff078a50ffd73ea7f1a8c67527078
parent97b22cf0fb6678c13e6b3ddb922a5c79f0635447 (diff)
downloadrabbitmq-server-git-9839718a3ae169c4a1c6cb19f0d5674d68cf9f63.tar.gz
I think this is tidier.
-rw-r--r--src/rabbit_binding.erl75
1 files changed, 44 insertions, 31 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 9aacfaa45e..d2767d158d 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -124,7 +124,11 @@ add(Binding, InnerFun) ->
case mnesia:read({rabbit_route, B}) of
[] -> ok = sync_binding(B, all_durable([Src, Dst]),
fun mnesia:write/3),
- fun (Tx) -> process_addition(Src, B, Tx) end;
+ fun (Tx) ->
+ ok = rabbit_exchange:callback(
+ Src, add_binding, [Tx, Src, B]),
+ process_addition(Src, B, Tx)
+ end;
[_] -> fun rabbit_misc:const_ok/1
end;
{error, _} = Err ->
@@ -399,49 +403,58 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_addition(Src, B, State) ->
- Serial = serial(Src, State, fun (_, S) -> S end),
- Tx = State =:= transaction,
- Arg = case Tx of true -> transaction; _ -> Serial end,
- ok = rabbit_exchange:callback(Src, add_binding, [Arg, Src, B]),
- rabbit_event:notify_if(not Tx, binding_created, info(B)),
- case Tx of true -> Serial; false -> ok end.
-
-process_deletions(Deletions, State) ->
- Tx = State =:= transaction,
- Next =
+process_addition(Src, _B, transaction) ->
+ serial(Src);
+
+process_addition(_Src, B, _Serial) ->
+ ok = rabbit_event:notify(binding_created, info(B)).
+
+process_deletions(Deletions, transaction) ->
+ process_deletions(
+ fun (X, Bindings, Acc) ->
+ pd_callback(transaction, remove_bindings, X, Bindings),
+ dict:store(X, serial(X), Acc)
+ end,
+ fun rabbit_misc:const_ok/1,
+ Deletions, dict:new(), true);
+
+process_deletions(Deletions, Serials) ->
+ process_deletions(
+ fun (X, Bindings, Acc) ->
+ pd_callback(dict:fetch(X, Serials), remove_bindings, X, Bindings),
+ Acc
+ end,
+ fun (X) ->
+ rabbit_event:notify(exchange_deleted, [{name, X#exchange.name}])
+ end,
+ Deletions, ok, false).
+
+process_deletions(NotDeletedFun, DeletedFun, Deletions, Acc0, Tx) ->
dict:fold(
- fun (_XName, {X, Deleted, Bindings}, Serials) ->
+ fun (_XName, {X, Deleted, Bindings}, Acc) ->
FlatBindings = lists:flatten(Bindings),
[rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
B <- FlatBindings],
case Deleted of
not_deleted ->
- Serial = serial(X, State, fun dict:fetch/2),
- Arg = case Tx of true -> transaction; _ -> Serial end,
- ok = rabbit_exchange:callback(X, remove_bindings,
- [Arg, X, FlatBindings]),
- dict:store(X, Serial, Serials);
+ NotDeletedFun(X, FlatBindings, Acc);
deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- ok = rabbit_exchange:callback(X, delete,
- [Tx, X, FlatBindings]),
- Serials
+ DeletedFun(X),
+ pd_callback(Tx, delete, X, Bindings),
+ Acc
end
- end, dict:new(), Deletions),
- case Tx of true -> Next; false -> ok end.
+ end, Acc0, Deletions).
-serial(X, State, Fun) ->
+pd_callback(Arg, CB, X, Bindings) ->
+ ok = rabbit_exchange:callback(X, CB, [Arg, X, Bindings]).
+
+serial(X) ->
case rabbit_exchange:callback(X, serialise_events, []) of
- true -> case State of
- transaction -> incr_serial(X);
- _ -> Fun(X, State)
- end;
+ true -> next_serial(X);
false -> none
end.
-incr_serial(#exchange{name = Name}) ->
+next_serial(#exchange{name = Name}) ->
Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of
[] -> 0;
[#exchange_serial{serial = S}] -> S