diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-28 21:20:46 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-28 21:20:46 +0100 |
| commit | e52c5e65eea60e770631d80735cc1bd63d56c12b (patch) | |
| tree | 59386f4b2090eae29d6398a4e2168f8989ef3a6a /src | |
| parent | 1c2cfb95baaf6654360c6156776ec6e5a9606a77 (diff) | |
| parent | 4e16d0c2d08c7ba8935427afcf215de7cc838a7b (diff) | |
| download | rabbitmq-server-git-e52c5e65eea60e770631d80735cc1bd63d56c12b.tar.gz | |
merge default into bug23939
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
12 files changed, 144 insertions, 87 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965d7..167b1a55ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -214,7 +214,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), B = add_default_binding(Q), - fun (Tx) -> B(Tx), Q end; + fun () -> B(), Q end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -222,7 +222,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), - fun (Tx) -> TailFun(Tx), ExistingQ end + fun () -> TailFun(), ExistingQ end end end end). @@ -433,9 +433,7 @@ internal_delete(QueueName) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - fun (Tx) -> ok = rabbit_binding:process_deletions( - Deletions, Tx) - end + rabbit_binding:process_deletions(Deletions) end end). @@ -464,18 +462,14 @@ drop_expired(QPid) -> gen_server2:cast(QPid, drop_expired). on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end, - fun (Deletions, Tx) -> - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - Deletions), - Tx) + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)) end). delete_queue(QueueName) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6167790e58..1336223244 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -21,7 +21,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/2]). + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -77,7 +77,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). +-spec(process_deletions/1 :: (deletions()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -122,21 +122,23 @@ add(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> ok = sync_binding(B, all_durable([Src, Dst]), - fun mnesia:write/3), - fun (Tx) -> - ok = rabbit_exchange:callback( - Src, add_binding, [Tx, Src, B]), - rabbit_event:notify_if( - not Tx, binding_created, info(B)) - end; - [_] -> fun rabbit_misc:const_ok/1 + [] -> add_notify(Src, Dst, B); + [_] -> fun rabbit_misc:const_ok/0 end; {error, _} = Err -> rabbit_misc:const(Err) end end). +add_notify(Src, Dst, B) -> + ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:write/3), + ok = rabbit_exchange:callback(Src, add_binding, [transaction, Src, B]), + Serial = serial(Src), + fun () -> + ok = rabbit_exchange:callback(Src, add_binding, [Serial, Src, B]), + ok = rabbit_event:notify(binding_created, info(B)) + end. + remove(Binding, InnerFun) -> binding_action( Binding, @@ -158,10 +160,8 @@ remove(Binding, InnerFun) -> end end, case Result of - {error, _} = Err -> - rabbit_misc:const(Err); - {ok, Deletions} -> - fun (Tx) -> ok = process_deletions(Deletions, Tx) end + {error, _} = Err -> rabbit_misc:const(Err); + {ok, Deletions} -> process_deletions(Deletions) end end). @@ -405,19 +405,46 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> - dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || - B <- FlatBindings], - case Deleted of - not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); - deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) - end - end, ok, Deletions). +process_deletions(Deletions) -> + Serials = dict:fold( + fun (_XName, {X, Deleted, Bindings}, Acc) -> + FlatBindings = lists:flatten(Bindings), + pd_callback(transaction, X, Deleted, FlatBindings), + dict:store(X, serial(X), Acc) + end, Deletions, dict:new()), + fun() -> + dict:fold( + fun (XName, {X, Deleted, Bindings}, ok) -> + FlatBindings = lists:flatten(Bindings), + Serial = dict:fetch(X, Serials), + pd_callback(Serial, X, Deleted, FlatBindings), + [rabbit_event:notify(binding_deleted, info(B)) || + B <- FlatBindings], + case Deleted of + deleted -> ok = rabbit_event:notify( + exchange_deleted, [{name, XName}]); + _ -> ok + end + end, Deletions, ok) + end. + +pd_callback(Arg, X, Deleted, Bindings) -> + ok = rabbit_exchange:callback(X, case Deleted of + not_deleted -> remove_bindings; + deleted -> delete + end, [Arg, X, Bindings]). + +serial(X) -> + case rabbit_exchange:serialise_events(X) of + true -> next_serial(X); + false -> none + end. + +next_serial(#exchange{name = Name}) -> + Serial = case mnesia:read(rabbit_exchange_serial, Name, write) of + [] -> 1; + [#exchange_serial{serial = S}] -> S + 1 + end, + mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = Name, serial = Serial}, write), + Serial. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index cab6510bdd..6801705279 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -22,7 +22,7 @@ assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, - publish/2, delete/2]). + publish/2, delete/2, serialise_events/1]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). @@ -72,10 +72,10 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). +-spec(serialise_events/1:: (rabbit_types:exchange()) -> boolean()). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). - -endif. %%---------------------------------------------------------------------------- @@ -131,6 +131,13 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> + S = case Tx of + true -> transaction; + false -> case serialise_events(Exchange) of + true -> 0; + false -> none + end + end, ok = (type_to_module(Type)):create(Tx, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; @@ -268,29 +275,37 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun, PrePostCommitFun) -> - rabbit_misc:execute_mnesia_transaction( +call_with_exchange(XName, Fun) -> + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; + [] -> rabbit_misc:const({error, not_found}); [X] -> Fun(X) end - end, PrePostCommitFun). + end). delete(XName, IfUnused) -> + delete0(XName, case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end). + +delete0(XName, Fun) -> call_with_exchange( XName, - case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - fun ({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); - (Error = {error, _InUseOrNotFound}, _Tx) -> - Error + fun (X) -> + case Fun(X) of + {deleted, X, Bs, Deletions} -> + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); + {error, _InUseOrNotFound} = E -> + rabbit_misc:const(E) + end end). +serialise_events(#exchange{type = XType}) -> + apply(type_to_module(XType), serialise_events, []). + maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> @@ -308,5 +323,6 @@ conditional_delete(X = #exchange{name = XName}) -> unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), + ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 547583e9ac..d1563a6211 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,6 +21,13 @@ behaviour_info(callbacks) -> [ {description, 0}, + + %% Should Rabbit ensure that all binding events that are + %% delivered to an individual exchange can be serialised? (they + %% might still be delivered out of order, but there'll be a + %% serial number). + {serialise_events, 0}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 349c2f6ee4..687567a8cd 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = Routes}}) -> rabbit_router:match_routing_key(Name, Routes). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index bc5293c81d..cbde0dd2c5 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index d3529b0657..89f8fcfbf9 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -41,6 +41,8 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index ffd1e58395..7f3d83e05c 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -38,6 +38,8 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + %% NB: This may return duplicate results in some situations (that's ok) route(#exchange{name = X}, #delivery{message = #basic_message{routing_keys = Routes}}) -> @@ -55,19 +57,19 @@ recover(_Exchange, Bs) -> lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) end). -delete(true, #exchange{name = X}, _Bs) -> +delete(transaction, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; -delete(false, _Exchange, _Bs) -> +delete(none, _Exchange, _Bs) -> ok. -add_binding(true, _Exchange, Binding) -> +add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); -add_binding(false, _Exchange, _Binding) -> +add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(true, #exchange{name = X}, Bs) -> +remove_bindings(transaction, #exchange{name = X}, Bs) -> %% The remove process is split into two distinct phases. In the %% first phase we gather the lists of bindings and edges to %% delete, then in the second phase we process all the @@ -86,7 +88,7 @@ remove_bindings(true, #exchange{name = X}, Bs) -> [trie_remove_edge(X, Parent, Node, W) || {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; -remove_bindings(false, _X, _Bs) -> +remove_bindings(none, _X, _Bs) -> ok. maybe_add_path(_X, [{root, none}], PathAcc) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2e9563cf3c..45f599993a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -54,7 +54,7 @@ -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). --export([const_ok/1, const/1]). +-export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). @@ -191,7 +191,7 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). --spec(const_ok/1 :: (any()) -> 'ok'). +-spec(const_ok/0 :: () -> 'ok'). -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). @@ -409,13 +409,8 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> TailFun = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - TailFun1(true), - TailFun1 - end), - TailFun(false) + false -> TailFun = execute_mnesia_transaction(TxFun), + TailFun() end. ensure_ok(ok, _) -> ok; @@ -847,8 +842,8 @@ lock_file(Path) -> ok = file:close(Lock) end. -const_ok(_) -> ok. -const(X) -> fun (_) -> X end. +const_ok() -> ok. +const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see %% when IPv6 is enabled but not used (i.e. 99% of the time). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fbcf07ae77..c73f557d03 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -220,6 +220,10 @@ table_definitions() -> [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ca046c9198..5a37c31a71 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -694,8 +694,8 @@ test_topic_matching() -> exchange_op_callback(X, Fun, ExtraArgs) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), - rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [none, X] ++ ExtraArgs). test_topic_expect_match(X, List) -> lists:foreach( diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7567c29ef3..7c53e99694 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). +-rabbit_upgrade({exchange_event_serial, mnesia, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(exchange_event_serial/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +exchange_event_serial() -> + create(rabbit_exchange_serial, [{record_name, exchange_serial}, + {attributes, [name, serial]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
