diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-24 10:46:34 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-24 10:46:34 +0000 |
| commit | f8b346795475c63bcf6b9d289bf57af6a77c6ada (patch) | |
| tree | 168d052e62e820cab223e505b08b95df69f2a97f /src | |
| parent | 9b1e99e3c1520798e9b8ca796d7a3bc53bf2ff55 (diff) | |
| parent | d055ecafc5ca06b046fe583ee1d48191cf8bd0d0 (diff) | |
| download | rabbitmq-server-git-f8b346795475c63bcf6b9d289bf57af6a77c6ada.tar.gz | |
Merge from default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
12 files changed, 127 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965d7..80dcb79a4a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -214,7 +214,13 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), B = add_default_binding(Q), - fun (Tx) -> B(Tx), Q end; + fun (Tx) -> + R = B(Tx), + case Tx of + transaction -> R; + _ -> Q + end + end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -433,8 +439,8 @@ internal_delete(QueueName) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - fun (Tx) -> ok = rabbit_binding:process_deletions( - Deletions, Tx) + fun (Tx) -> rabbit_binding:process_deletions( + Deletions, Tx) end end end). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6167790e58..cc7aea33bb 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -127,8 +127,7 @@ add(Binding, InnerFun) -> fun (Tx) -> ok = rabbit_exchange:callback( Src, add_binding, [Tx, Src, B]), - rabbit_event:notify_if( - not Tx, binding_created, info(B)) + process_addition(Src, B, Tx) end; [_] -> fun rabbit_misc:const_ok/1 end; @@ -161,7 +160,7 @@ remove(Binding, InnerFun) -> {error, _} = Err -> rabbit_misc:const(Err); {ok, Deletions} -> - fun (Tx) -> ok = process_deletions(Deletions, Tx) end + fun (Tx) -> process_deletions(Deletions, Tx) end end end). @@ -405,19 +404,66 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> +process_addition(Src, _B, transaction) -> + serial(Src); + +process_addition(_Src, B, _Serial) -> + ok = rabbit_event:notify(binding_created, info(B)). + +process_deletions(Deletions, transaction) -> + process_deletions( + fun (X, Bindings, Acc) -> + pd_callback(transaction, remove_bindings, X, Bindings), + dict:store(X, serial(X), Acc) + end, + fun (X, Bindings, Acc) -> + pd_callback(transaction, delete, X, Bindings), + dict:store(X, serial(X), Acc) + end, + Deletions, dict:new(), true); + +process_deletions(Deletions, Serials) -> + process_deletions( + fun (X, Bindings, Acc) -> + pd_callback(dict:fetch(X, Serials), remove_bindings, X, Bindings), + Acc + end, + fun (X, Bindings, Acc) -> + pd_callback(dict:fetch(X, Serials), delete, X, Bindings), + rabbit_event:notify(exchange_deleted, [{name, X#exchange.name}]), + Acc + end, + Deletions, ok, false). + +process_deletions(NotDeletedFun, DeletedFun, Deletions, Acc0, Tx) -> dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> + fun (_XName, {X, Deleted, Bindings}, Acc) -> FlatBindings = lists:flatten(Bindings), [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || B <- FlatBindings], case Deleted of not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); + NotDeletedFun(X, FlatBindings, Acc); deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) + DeletedFun(X, FlatBindings, Acc) end - end, ok, Deletions). + end, Acc0, Deletions). + +pd_callback(Arg, CB, X, Bindings) -> + ok = rabbit_exchange:callback(X, CB, [Arg, X, Bindings]). + +serial(X) -> + case rabbit_exchange:callback(X, serialise_events, [X]) of + true -> next_serial(X); + false -> none + end. + +next_serial(#exchange{name = Name}) -> + Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of + [] -> 0; + [#exchange_serial{serial = S}] -> S + end, + Serial = Prev + 1, + mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = Name, serial = Serial}, write), + Serial. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a463e57067..504cf93517 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -72,7 +72,8 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). +-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> + boolean() | 'ok'). -endif. @@ -126,7 +127,15 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - callback(Exchange, create, [Tx, Exchange]), + S = case Tx of + true -> transaction; + false -> case callback(Exchange, serialise_events, + [Exchange]) of + true -> 0; + false -> none + end + end, + callback(Exchange, create, [S, Exchange]), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -264,12 +273,13 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, [QName | QNames]}. call_with_exchange(XName, Fun, PrePostCommitFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end - end, PrePostCommitFun). + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> Result = case mnesia:read({rabbit_exchange, XName}) of + [] -> {error, not_found}; + [X] -> Fun(X) + end, + fun(Tx) -> PrePostCommitFun(Result, Tx) end + end). delete(XName, IfUnused) -> call_with_exchange( @@ -279,9 +289,9 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, fun ({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), Tx); (Error = {error, _InUseOrNotFound}, _Tx) -> Error end). @@ -306,5 +316,6 @@ conditional_delete(X = #exchange{name = XName}) -> unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), + ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 547583e9ac..670551de08 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,6 +21,12 @@ behaviour_info(callbacks) -> [ {description, 0}, + + %% Should Rabbit ensure that all events delivered to an individual exchange + %% this can be serialised? (they might still be delivered out + %% of order, but there'll be a serial number). + {serialise_events, 1}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 349c2f6ee4..bc7a76e30b 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/1]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -39,6 +39,7 @@ route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = Routes}}) -> rabbit_router:match_routing_key(Name, Routes). +serialise_events(_X) -> false. validate(_X) -> ok. create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index bc5293c81d..2e70fb24f2 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/1]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -38,6 +38,7 @@ description() -> route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). +serialise_events(_X) -> false. validate(_X) -> ok. create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index d3529b0657..1e8b0687a0 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/1]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -112,6 +112,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], end, headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). +serialise_events(_X) -> false. validate(_X) -> ok. create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index ffd1e58395..e3fd9283b1 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/1]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -46,6 +46,7 @@ route(#exchange{name = X}, mnesia:async_dirty(fun trie_match/2, [X, Words]) end || RKey <- Routes]). +serialise_events(_X) -> false. validate(_X) -> ok. create(_Tx, _X) -> ok. @@ -55,19 +56,19 @@ recover(_Exchange, Bs) -> lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) end). -delete(true, #exchange{name = X}, _Bs) -> +delete(transaction, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; -delete(false, _Exchange, _Bs) -> +delete(none, _Exchange, _Bs) -> ok. -add_binding(true, _Exchange, Binding) -> +add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); -add_binding(false, _Exchange, _Binding) -> +add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(true, #exchange{name = X}, Bs) -> +remove_bindings(transaction, #exchange{name = X}, Bs) -> %% The remove process is split into two distinct phases. In the %% first phase we gather the lists of bindings and edges to %% delete, then in the second phase we process all the @@ -86,7 +87,7 @@ remove_bindings(true, #exchange{name = X}, Bs) -> [trie_remove_edge(X, Parent, Node, W) || {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; -remove_bindings(false, _X, _Bs) -> +remove_bindings(none, _X, _Bs) -> ok. maybe_add_path(_X, [{root, none}], PathAcc) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2e9563cf3c..3f0bc9bb38 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -409,13 +409,13 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> TailFun = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - TailFun1(true), - TailFun1 - end), - TailFun(false) + false -> {TailFun, TailRes} = execute_mnesia_transaction( + fun () -> + TailFun1 = TxFun(), + Res1 = TailFun1(transaction), + {TailFun1, Res1} + end), + TailFun(TailRes) end. ensure_ok(ok, _) -> ok; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fbcf07ae77..c73f557d03 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -220,6 +220,10 @@ table_definitions() -> [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ca046c9198..5a37c31a71 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -694,8 +694,8 @@ test_topic_matching() -> exchange_op_callback(X, Fun, ExtraArgs) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), - rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [none, X] ++ ExtraArgs). test_topic_expect_match(X, List) -> lists:foreach( diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7567c29ef3..28aee9c91a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). +-rabbit_upgrade({exchange_event_serialisation, mnesia, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(exchange_event_serialisation/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +exchange_event_serialisation() -> + create(rabbit_exchange_serial, [{record_name, exchange_serial}, + {attributes, [name, serial]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
