diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-17 10:56:08 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-17 10:56:08 +0100 |
| commit | 453ece9b11a0cef4b9ce6c300b12ed07cde1c931 (patch) | |
| tree | 8331ad2f339f2a38bded236700d4dc53963ac73c | |
| parent | 7bc8f4795784f39d3c815c420e7658a1a0768966 (diff) | |
| parent | 3870c1b3ddaf6f147da33d98942a497ad60a8253 (diff) | |
| download | rabbitmq-server-git-453ece9b11a0cef4b9ce6c300b12ed07cde1c931.tar.gz | |
Merging default to bug23554
| -rw-r--r-- | include/rabbit.hrl | 1 | ||||
| -rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 12 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
15 files changed, 206 insertions, 107 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index f8c1a13d44..1388f3c43b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,6 +43,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments}). +-record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid, mirror_pids}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index c80cc1966d..f6283ef7c1 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -16,16 +16,20 @@ -ifdef(use_specs). +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). + -spec(description/0 :: () -> [{atom(), any()}]). +-spec(serialise_events/0 :: () -> boolean()). -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). --spec(delete/3 :: (boolean(), rabbit_types:exchange(), +-spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok'). +-spec(delete/3 :: (tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), +-spec(add_binding/3 :: (serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'). --spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(), +-spec(remove_bindings/3 :: (serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), rabbit_framing:amqp_table()) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a681041d93..d79fe9df39 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -141,8 +141,8 @@ -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | - fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), B = add_default_binding(Q), - fun (Tx) -> B(Tx), Q end; + fun () -> B(), Q end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -226,7 +226,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), - fun (Tx) -> TailFun(Tx), ExistingQ end + fun () -> TailFun(), ExistingQ end end end end). @@ -437,9 +437,7 @@ internal_delete(QueueName) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - fun (Tx) -> ok = rabbit_binding:process_deletions( - Deletions, Tx) - end + rabbit_binding:process_deletions(Deletions) end end). @@ -468,19 +466,15 @@ drop_expired(QPid) -> gen_server2:cast(QPid, drop_expired). on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid, - mirror_pids = []} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end, - fun (Deletions, Tx) -> - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - Deletions), - Tx) + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid, + mirror_pids = []} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)) end). delete_queue(QueueName) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index dc119fbd5e..2f71bfab63 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -21,7 +21,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/2]). + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -77,7 +77,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). +-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -114,12 +114,14 @@ recover(XNames, QNames) -> end) end, fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> - case Tx of - true -> ok = sync_transient_route(R, fun mnesia:write/3); - false -> ok - end, {ok, X} = rabbit_exchange:lookup(Src), - rabbit_exchange:callback(X, add_binding, [Tx, X, B]) + Serial = case Tx of + true -> ok = sync_transient_route( + R, fun mnesia:write/3), + transaction; + false -> rabbit_exchange:serial(X) + end, + rabbit_exchange:callback(X, add_binding, [Serial, X, B]) end, rabbit_semi_durable_route), ok. @@ -142,7 +144,7 @@ add(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/1 + [_] -> fun rabbit_misc:const_ok/0 end; {error, _} = Err -> rabbit_misc:const(Err) end @@ -154,10 +156,13 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), - fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding, - [Tx, Src, B]), - rabbit_event:notify_if(not Tx, binding_created, - info(B)) + ok = rabbit_exchange:callback( + Src, add_binding, [transaction, Src, B]), + Serial = rabbit_exchange:serial(Src), + fun () -> + ok = rabbit_exchange:callback( + Src, add_binding, [Serial, Src, B]), + ok = rabbit_event:notify(binding_created, info(B)) end; false -> rabbit_misc:const({error, binding_not_found}) end. @@ -181,7 +186,7 @@ remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), - fun (Tx) -> ok = process_deletions(Deletions, Tx) end. + process_deletions(Deletions). list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), @@ -407,19 +412,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> - dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || - B <- FlatBindings], - case Deleted of - not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); - deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) - end - end, ok, Deletions). +process_deletions(Deletions) -> + AugmentedDeletions = + dict:map(fun (_XName, {X, deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, delete, Bs), + {X, deleted, Bs, none}; + (_XName, {X, not_deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, remove_bindings, Bs), + {X, not_deleted, Bs, rabbit_exchange:serial(X)} + end, Deletions), + fun() -> + dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) -> + ok = rabbit_event:notify( + exchange_deleted, [{name, XName}]), + del_notify(Bs), + x_callback(Serial, X, delete, Bs); + (_XName, {X, not_deleted, Bs, Serial}, ok) -> + del_notify(Bs), + x_callback(Serial, X, remove_bindings, Bs) + end, ok, AugmentedDeletions) + end. + +del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. + +x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 421117736d..84a44cd2ee 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -23,8 +23,8 @@ lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). -%% this must be run inside a mnesia tx --export([maybe_auto_delete/1]). +%% these must be run inside a mnesia tx +-export([maybe_auto_delete/1, serial/1]). %%---------------------------------------------------------------------------- @@ -75,6 +75,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). +-spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()). -endif. @@ -89,7 +90,7 @@ recover() -> end, fun (X, Tx) -> case Tx of - true -> ok = mnesia:write(rabbit_exchange, X, write); + true -> store(X); false -> ok end, rabbit_exchange:callback(X, create, [Tx, X]) @@ -107,13 +108,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> auto_delete = AutoDelete, internal = Internal, arguments = Args}, + XT = type_to_module(Type), %% We want to upset things if it isn't ok - ok = (type_to_module(Type)):validate(X), + ok = XT:validate(X), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, X, write), + store(X), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, X, write); @@ -125,7 +127,10 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = (type_to_module(Type)):create(Tx, Exchange), + ok = XT:create(case Tx of + true -> transaction; + false -> none + end, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -134,6 +139,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). +store(X = #exchange{name = Name, type = Type}) -> + ok = mnesia:write(rabbit_exchange, X, write), + case (type_to_module(Type)):serialise_events() of + true -> S = #exchange_serial{name = Name, next = 1}, + ok = mnesia:write(rabbit_exchange_serial, S, write); + false -> ok + end. + %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -257,27 +270,30 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun, PrePostCommitFun) -> - rabbit_misc:execute_mnesia_transaction( +call_with_exchange(XName, Fun) -> + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; + [] -> rabbit_misc:const({error, not_found}); [X] -> Fun(X) end - end, PrePostCommitFun). + end). delete(XName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, call_with_exchange( XName, - case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - fun ({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); - (Error = {error, _InUseOrNotFound}, _Tx) -> - Error + fun (X) -> + case Fun(X) of + {deleted, X, Bs, Deletions} -> + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); + {error, _InUseOrNotFound} = E -> + rabbit_misc:const(E) + end end). maybe_auto_delete(#exchange{auto_delete = false}) -> @@ -297,9 +313,23 @@ conditional_delete(X = #exchange{name = XName}) -> unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), + ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. +serial(#exchange{name = XName, type = Type}) -> + case (type_to_module(Type)):serialise_events() of + true -> next_serial(XName); + false -> none + end. + +next_serial(XName) -> + [#exchange_serial{next = Serial}] = + mnesia:read(rabbit_exchange_serial, XName, write), + ok = mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = XName, next = Serial + 1}, write), + Serial. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index cd96407cc7..ab3d00dc28 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,6 +21,13 @@ behaviour_info(callbacks) -> [ {description, 0}, + + %% Should Rabbit ensure that all binding events that are + %% delivered to an individual exchange can be serialised? (they + %% might still be delivered out of order, but there'll be a + %% serial number). + {serialise_events, 0}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 40078b1a5f..b485e31f33 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = Routes}}) -> rabbit_router:match_routing_key(Name, Routes). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index f32ef91773..3c02972278 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 139feb04f8..f09e4aae73 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -41,6 +41,8 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 74c566b803..348655b101 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -38,6 +38,8 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + %% NB: This may return duplicate results in some situations (that's ok) route(#exchange{name = X}, #delivery{message = #basic_message{routing_keys = Routes}}) -> @@ -49,19 +51,19 @@ route(#exchange{name = X}, validate(_X) -> ok. create(_Tx, _X) -> ok. -delete(true, #exchange{name = X}, _Bs) -> +delete(transaction, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; -delete(false, _Exchange, _Bs) -> +delete(none, _Exchange, _Bs) -> ok. -add_binding(true, _Exchange, Binding) -> +add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); -add_binding(false, _Exchange, _Binding) -> +add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(true, #exchange{name = X}, Bs) -> +remove_bindings(transaction, #exchange{name = X}, Bs) -> %% The remove process is split into two distinct phases. In the %% first phase we gather the lists of bindings and edges to %% delete, then in the second phase we process all the @@ -80,7 +82,7 @@ remove_bindings(true, #exchange{name = X}, Bs) -> [trie_remove_edge(X, Parent, Node, W) || {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; -remove_bindings(false, _X, _Bs) -> +remove_bindings(none, _X, _Bs) -> ok. maybe_add_path(_X, [{root, none}], PathAcc) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index cec10ff609..53171e877b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -40,7 +40,7 @@ -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([read_term_file/1, write_term_file/2]). +-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). @@ -53,7 +53,7 @@ -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). --export([const_ok/1, const/1]). +-export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). @@ -61,11 +61,10 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1, const/1]). +-export_type([resource_name/0, thunk/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). --type(const(T) :: fun((any()) -> T)). -type(resource_name() :: binary()). -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() @@ -154,6 +153,8 @@ -spec(read_term_file/1 :: (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). -spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). +-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()). +-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()). -spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). @@ -190,8 +191,8 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). --spec(const_ok/1 :: (any()) -> 'ok'). --spec(const/1 :: (A) -> const(A)). +-spec(const_ok/0 :: () -> 'ok'). +-spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). -spec(is_process_alive/1 :: (pid()) -> boolean()). @@ -404,17 +405,12 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> end), false). %% Like execute_mnesia_transaction/2, but TxFun is expected to return a -%% TailFun which gets called immediately before and after the tx commit +%% TailFun which gets called (only) immediately after the tx commit execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> TailFun = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - TailFun1(true), - TailFun1 - end), - TailFun(false) + false -> TailFun = execute_mnesia_transaction(TxFun), + TailFun() end. ensure_ok(ok, _) -> ok; @@ -515,8 +511,42 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> read_term_file(File) -> file:consult(File). write_term_file(File, Terms) -> - file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || - Term <- Terms])). + write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + +write_file(Path, Data) -> + write_file(Path, Data, []). + +%% write_file/3 and make_binary/1 are both based on corresponding +%% functions in the kernel/file.erl module of the Erlang R14B02 +%% release, which is licensed under the EPL. That implementation of +%% write_file/3 does not do an fsync prior to closing the file, hence +%% the existence of this version. APIs are otherwise identical. +write_file(Path, Data, Modes) -> + Modes1 = [binary, write | (Modes -- [binary, write])], + case make_binary(Data) of + Bin when is_binary(Bin) -> + case file:open(Path, Modes1) of + {ok, Hdl} -> try file:write(Hdl, Bin) of + ok -> file:sync(Hdl); + {error, _} = E -> E + after + file:close(Hdl) + end; + {error, _} = E -> E + end; + {error, _} = E -> E + end. + +make_binary(Bin) when is_binary(Bin) -> + Bin; +make_binary(List) -> + try + iolist_to_binary(List) + catch error:Reason -> + {error, Reason} + end. + append_file(File, Suffix) -> case file:read_file_info(File) of @@ -534,7 +564,7 @@ append_file(File, 0, Suffix) -> end; append_file(File, _, Suffix) -> case file:read_file(File) of - {ok, Data} -> file:write_file([File, Suffix], Data, [append]); + {ok, Data} -> write_file([File, Suffix], Data, [append]); Error -> Error end. @@ -843,8 +873,8 @@ lock_file(Path) -> ok = file:close(Lock) end. -const_ok(_) -> ok. -const(X) -> fun (_) -> X end. +const_ok() -> ok. +const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see %% when IPv6 is enabled but not used (i.e. 99% of the time). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 745ecc0140..87a5f44ea7 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -225,6 +225,10 @@ table_definitions() -> [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 79deb46c4d..2512a60280 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -67,7 +67,7 @@ start() -> AppVersions}, %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel - file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), + rabbit_misc:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script ScriptFile = RootName ++ ".script", diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 095e358a56..3726420d53 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -691,10 +691,10 @@ test_topic_matching() -> test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. -exchange_op_callback(X, Fun, ExtraArgs) -> +exchange_op_callback(X, Fun, Args) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), - rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end), + rabbit_exchange:callback(X, Fun, [none, X] ++ Args). test_topic_expect_match(X, List) -> lists:foreach( @@ -1607,7 +1607,7 @@ test_file_handle_cache() -> [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]], Content = <<"foo">>, CopyFun = fun (Src, Dst) -> - ok = file:write_file(Src, Content), + ok = rabbit_misc:write_file(Src, Content), {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), Size = size(Content), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 842c3b4fac..31bbb9295e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -27,6 +27,7 @@ -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). -rabbit_upgrade({semi_durable_route, mnesia, []}). +-rabbit_upgrade({exchange_event_serial, mnesia, []}). %% ------------------------------------------------------------------- @@ -38,6 +39,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(exchange_event_serial/0 :: () -> 'ok'). -spec(semi_durable_route/0 :: () -> 'ok'). -endif. @@ -107,6 +109,10 @@ semi_durable_route() -> create(rabbit_semi_durable_route, [{record_name, route}, {attributes, [binding, value]}]). +exchange_event_serial() -> + create(rabbit_exchange_serial, [{record_name, exchange_serial}, + {attributes, [name, next]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
