diff options
| author | Tim Watson <tim@rabbitmq.com> | 2012-07-03 12:45:28 +0100 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2012-07-03 12:45:28 +0100 |
| commit | dbb24b9824d487ae31481a28225d6f29e9b30691 (patch) | |
| tree | 822f4fe4276ba910659261820f8076f729bbad96 | |
| parent | 030a937e8c29b76444d56b8d4bb37832a98971b5 (diff) | |
| parent | eeca9492c3cb7b69d7906922c1a89ef364f4e46a (diff) | |
| download | rabbitmq-server-git-dbb24b9824d487ae31481a28225d6f29e9b30691.tar.gz | |
merge default
31 files changed, 610 insertions, 128 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d6f6d51fab..2d25edee40 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -887,6 +887,10 @@ <listitem><para>Queue arguments.</para></listitem> </varlistentry> <varlistentry> + <term>policy</term> + <listitem><para>Policy name applying to the queue.</para></listitem> + </varlistentry> + <varlistentry> <term>pid</term> <listitem><para>Id of the Erlang process associated with the queue.</para></listitem> </varlistentry> @@ -999,6 +1003,10 @@ <term>arguments</term> <listitem><para>Exchange arguments.</para></listitem> </varlistentry> + <varlistentry> + <term>policy</term> + <listitem><para>Policy name for applying to the exchange.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>exchangeinfoitem</command>s are specified then @@ -1207,7 +1215,7 @@ </varlistentry> <varlistentry> <term>timeout</term> - <listitem><para>Connection timeout.</para></listitem> + <listitem><para>Connection timeout / negotiated heartbeat interval, in seconds.</para></listitem> </varlistentry> <varlistentry> <term>frame_max</term> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index ffe112a0e2..523b54ced1 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,7 +19,7 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, - {disk_free_limit, {mem_relative, 1.0}}, + {disk_free_limit, 1000000000}, %% 1GB {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, %% 0 ("no limit") would make a better default, but that diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5c73c8b88b..e8b4a6232e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,11 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratch}). + scratches, policy}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes}). + arguments, pid, slave_pids, mirror_nodes, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 3c26872671..c67a0263f6 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -43,9 +43,9 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b8822739f5..167f272e41 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -86,8 +86,8 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 849bedcf6b..4758c861da 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -154,7 +154,10 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/gm.erl b/src/gm.erl index 30fcdc5d86..f88ed18fbf 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -558,7 +558,7 @@ handle_call(group_members, _From, reply(not_joined, State); handle_call(group_members, _From, State = #state { view = View }) -> - reply(alive_view_members(View), State); + reply(get_pids(alive_view_members(View)), State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> @@ -647,7 +647,7 @@ handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); -handle_info({'DOWN', MRef, process, _Pid, _Reason}, +handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, right = Right, @@ -661,8 +661,10 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, {_, {Member1, MRef}} -> Member1; _ -> undefined end, - case Member of - undefined -> + case {Member, Reason} of + {undefined, _} -> + noreply(State); + {_, {shutdown, ring_shutdown}} -> noreply(State); _ -> View1 = diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eca1017cfc..afbaea651b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,7 +28,7 @@ -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([store_queue/1]). +-export([update/2, store_queue/1, policy_changed/2]). %% internal @@ -71,6 +71,9 @@ -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). +-spec(update/2 :: + (name(), + fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); @@ -157,6 +160,8 @@ -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(policy_changed/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -225,9 +230,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of - [] -> ok = store_queue(Q), - B = add_default_binding(Q), - fun () -> B(), Q end; + [] -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -240,6 +246,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> end end). +update(Name, Fun) -> + case mnesia:wread({rabbit_queue, Name}) of + [Q = #amqqueue{durable = Durable}] -> + Q1 = Fun(Q), + ok = mnesia:write(rabbit_queue, Q1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); + _ -> ok + end; + [] -> + ok + end. + store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write), ok = mnesia:write(rabbit_queue, Q, write), @@ -248,6 +267,9 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. +policy_changed(_Q1, _Q2) -> + ok. + determine_queue_nodes(Args) -> Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), @@ -511,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f2833c2623..8933de8746 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, {new, Q}), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, - [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - BQS = bq_init(BQ, Q, Recover), - State1 = process_args(State#q{backing_queue_state = BQS}), - rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - noreply(State1); - Q1 -> {stop, normal, {existing, Q1}, State} - end. + not_found -> + {stop, normal, not_found, State}; + Q1 -> + case matches(Recover, Q, Q1) of + true -> + gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQS = bq_init(BQ, Q, Recover), + State1 = process_args(State#q{backing_queue_state = BQS}), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + noreply(State1); + false -> + {stop, normal, {existing, Q1}, State} + end + end. + +matches(true, Q, Q) -> true; +matches(true, _Q, _Q1) -> false; +matches(false, Q1, Q2) -> + %% i.e. not policy + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso + Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. bq_init(BQ, Q, Recover) -> Self = self(), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb44797e4d..f0ea514dcf 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -173,13 +173,11 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), - ok = rabbit_exchange:callback( - Src, add_binding, [transaction, Src, B]), + x_callback(transaction, Src, add_binding, B), Serial = rabbit_exchange:serial(Src), fun () -> - ok = rabbit_exchange:callback( - Src, add_binding, [Serial, Src, B]), - ok = rabbit_event:notify(binding_created, info(B)) + x_callback(Serial, Src, add_binding, B), + ok = rabbit_event:notify(binding_created, info(B)) end; false -> rabbit_misc:const({error, binding_not_found}) end. @@ -487,4 +485,5 @@ process_deletions(Deletions) -> del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. -x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). +x_callback(Serial, X, F, Bs) -> + ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 2e163cfbe1..b23088cc7a 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -156,6 +156,11 @@ start() -> {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), usage(); + {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) -> + %% We handle this common case specially to avoid ~p since + %% that has i18n issues + print_error("~s: ~s", [Problem, Reason]), + rabbit_misc:quit(2); {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index ed29bd80b9..58375abb45 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -27,7 +27,7 @@ set_check_interval/1, get_disk_free/0]). -define(SERVER, ?MODULE). --define(DEFAULT_DISK_CHECK_INTERVAL, 60000). +-define(DEFAULT_DISK_CHECK_INTERVAL, 10000). -record(state, {dir, limit, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 910a89b42c..57c571f1ff 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,13 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, callback/3, declare/6, +-export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, update_scratch/2, + lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1, peek_serial/1]). +-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). %%---------------------------------------------------------------------------- @@ -37,7 +37,12 @@ -type(fun_name() :: atom()). -spec(recover/0 :: () -> [name()]). --spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). +-spec(callback/4:: + (rabbit_types:exchange(), fun_name(), + fun((boolean()) -> non_neg_integer()) | atom(), + [any()]) -> 'ok'). +-spec(policy_changed/2 :: + (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -58,7 +63,13 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). --spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok'). +-spec(lookup_scratch/2 :: (name(), atom()) -> + rabbit_types:ok(term()) | + rabbit_types:error('not_found')). +-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok'). +-spec(update/2 :: + (name(), + fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok'). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -76,14 +87,16 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(serial/1 :: (rabbit_types:exchange()) -> + fun((boolean()) -> 'none' | pos_integer())). -spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined'). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, + policy]). recover() -> Xs = rabbit_misc:table_filter( @@ -95,21 +108,52 @@ recover() -> true -> store(X); false -> ok end, - rabbit_exchange:callback(X, create, [map_create_tx(Tx), X]) + callback(X, create, map_create_tx(Tx), [X]) end, rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. -callback(#exchange{type = XType}, Fun, Args) -> - apply(type_to_module(XType), Fun, Args). +callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> + Serial = fun (Bool) -> + case Serial0 of + _ when is_atom(Serial0) -> Serial0; + _ -> Serial0(Bool) + end + end, + [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) + || M <- decorators()], + Module = type_to_module(XType), + apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). + +policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). + +serialise_events(X = #exchange{type = Type}) -> + case [Serialise || M <- decorators(), + Serialise <- [M:serialise_events(X)], + Serialise == true] of + [] -> (type_to_module(Type)):serialise_events(); + _ -> true + end. + +serial(#exchange{name = XName} = X) -> + Serial = case serialise_events(X) of + true -> next_serial(XName); + false -> none + end, + fun (true) -> Serial; + (false) -> none + end. + +decorators() -> + [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = #exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}, + X = rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args}), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -129,7 +173,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = XT:create(map_create_tx(Tx), Exchange), + ok = callback(X, create, map_create_tx(Tx), [Exchange]), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -141,13 +185,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> map_create_tx(true) -> transaction; map_create_tx(false) -> none. -store(X = #exchange{name = Name, type = Type}) -> - ok = mnesia:write(rabbit_exchange, X, write), - case (type_to_module(Type)):serialise_events() of - true -> S = #exchange_serial{name = Name, next = 1}, - ok = mnesia:write(rabbit_exchange_serial, S, write); - false -> ok - end. +store(X) -> ok = mnesia:write(rabbit_exchange, X, write). %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -200,23 +238,51 @@ list(VHostPath) -> rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). -update_scratch(Name, Fun) -> +lookup_scratch(Name, App) -> + case lookup(Name) of + {ok, #exchange{scratches = undefined}} -> + {error, not_found}; + {ok, #exchange{scratches = Scratches}} -> + case orddict:find(App, Scratches) of + {ok, Value} -> {ok, Value}; + error -> {error, not_found} + end; + {error, not_found} -> + {error, not_found} + end. + +update_scratch(Name, App, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> - case mnesia:wread({rabbit_exchange, Name}) of - [X = #exchange{durable = Durable, scratch = Scratch}] -> - X1 = X#exchange{scratch = Fun(Scratch)}, - ok = mnesia:write(rabbit_exchange, X1, write), - case Durable of - true -> ok = mnesia:write(rabbit_durable_exchange, - X1, write); - _ -> ok - end; - [] -> - ok - end + update(Name, + fun(X = #exchange{scratches = Scratches0}) -> + Scratches1 = case Scratches0 of + undefined -> orddict:new(); + _ -> Scratches0 + end, + Scratch = case orddict:find(App, Scratches1) of + {ok, S} -> S; + error -> undefined + end, + Scratches2 = orddict:store( + App, Fun(Scratch), Scratches1), + X#exchange{scratches = Scratches2} + end) end). +update(Name, Fun) -> + case mnesia:wread({rabbit_exchange, Name}) of + [X = #exchange{durable = Durable}] -> + X1 = Fun(X), + ok = mnesia:write(rabbit_exchange, X1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); + _ -> ok + end; + [] -> + ok + end. + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -232,6 +298,7 @@ i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; +i(policy, X) -> rabbit_policy:name(X); i(Item, _) -> throw({bad_argument, Item}). info(X = #exchange{}) -> infos(?INFO_KEYS, X). @@ -341,23 +408,18 @@ unconditional_delete(X = #exchange{name = XName}) -> Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. -serial(#exchange{name = XName, type = Type}) -> - case (type_to_module(Type)):serialise_events() of - true -> next_serial(XName); - false -> none - end. - next_serial(XName) -> - [#exchange_serial{next = Serial}] = - mnesia:read(rabbit_exchange_serial, XName, write), + Serial = peek_serial(XName, write), ok = mnesia:write(rabbit_exchange_serial, #exchange_serial{name = XName, next = Serial + 1}, write), Serial. -peek_serial(XName) -> - case mnesia:read({rabbit_exchange_serial, XName}) of +peek_serial(XName) -> peek_serial(XName, read). + +peek_serial(XName, LockType) -> + case mnesia:read(rabbit_exchange_serial, XName, LockType) of [#exchange_serial{next = Serial}] -> Serial; - _ -> undefined + _ -> 1 end. invalid_module(T) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl new file mode 100644 index 0000000000..b40ceda918 --- /dev/null +++ b/src/rabbit_exchange_decorator.erl @@ -0,0 +1,71 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_exchange_decorator). + +%% This is like an exchange type except that: +%% +%% 1) It applies to all exchanges as soon as it is installed, therefore +%% 2) It is not allowed to affect validation, so no validate/1 or +%% assert_args_equivalence/2 +%% 3) It also can't affect routing +%% +%% It's possible in the future we might relax 3), or even make these +%% able to manipulate messages as they are published. + +-ifdef(use_specs). + +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). + +-callback description() -> [proplist:property()]. + +%% Should Rabbit ensure that all binding events that are +%% delivered to an individual exchange can be serialised? (they +%% might still be delivered out of order, but there'll be a +%% serial number). +-callback serialise_events(rabbit_types:exchange()) -> boolean(). + +%% called after declaration and recovery +-callback create(tx(), rabbit_types:exchange()) -> 'ok'. + +%% called after exchange (auto)deletion. +-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> + 'ok'. + +%% called after a binding has been added or recovered +-callback add_binding(serial(), rabbit_types:exchange(), + rabbit_types:binding()) -> 'ok'. + +%% called after bindings have been deleted. +-callback remove_bindings(serial(), rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'. + +%% called when the policy attached to this exchange changes. +-callback policy_changed ( + serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, + {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 1027570c8b..e6470b721e 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -58,6 +58,10 @@ rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit(). +%% called when the policy attached to this exchange changes. +-callback policy_changed ( + serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. + -else. -export([behaviour_info/1]). @@ -65,7 +69,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1}, {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3}, - {assert_args_equivalence, 2}]; + {assert_args_equivalence, 2}, {policy_changed, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index cdec1cb9f2..9a5665c078 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -43,6 +43,7 @@ route(#exchange{name = Name}, validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index a64f2c2924..d9a2f60fdd 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) -> validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 61917d8f4a..516b78e59c 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 82d27960bc..101fe434e9 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). description() -> @@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) -> validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 3160fdf46e..644d9acf07 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) -> delete(none, _Exchange, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. + add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); add_binding(none, _Exchange, _Binding) -> diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 59df14f318..a95f8f269d 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -102,9 +102,12 @@ read_file_info(File) -> with_fhc_handle(fun () -> prim_file:read_file_info(File) end). with_fhc_handle(Fun) -> - ok = file_handle_cache:obtain(), + with_fhc_handle(1, Fun). + +with_fhc_handle(N, Fun) -> + [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)], try Fun() - after ok = file_handle_cache:release() + after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)] end. read_term_file(File) -> @@ -165,7 +168,7 @@ make_binary(List) -> {error, Reason} end. - +%% TODO the semantics of this function are rather odd. But see bug 25021. append_file(File, Suffix) -> case read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -183,9 +186,11 @@ append_file(File, 0, Suffix) -> end end); append_file(File, _, Suffix) -> - case with_fhc_handle(fun () -> prim_file:read_file(File) end) of - {ok, Data} -> write_file([File, Suffix], Data, [append]); - Error -> Error + case with_fhc_handle(2, fun () -> + file:copy(File, {[File, Suffix], [append]}) + end) of + {ok, _BytesCopied} -> ok; + Error -> Error end. ensure_parent_dirs_exist(Filename) -> diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 71e0507a33..3e058793ec 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -405,9 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); -handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) -> +handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, Msg), - {stop, Reason}; + {stop, {shutdown, ring_shutdown}}; handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4e71cc43db..750bcd56e2 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -127,10 +127,21 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> + Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], + MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + monitor_wait(MRefs), State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }. +monitor_wait([]) -> + ok; +monitor_wait([MRef | MRefs]) -> + receive({'DOWN', MRef, process, _Pid, _Info}) -> + ok + end, + monitor_wait(MRefs). + purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e412fbbc5d..03fafc3e5a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> ok; handle_msg([SPid], _From, {process_death, Pid}) -> inform_deaths(SPid, [Pid]); +handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> + ok = gen_server2:cast(CPid, {gm, Msg}), + {stop, {shutdown, ring_shutdown}}; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). inform_deaths(SPid, Deaths) -> - rabbit_misc:with_exit_handler( - fun () -> {stop, normal} end, - fun () -> - case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of - ok -> - ok; - {promote, CPid} -> - {become, rabbit_mirror_queue_coordinator, [CPid]} - end - end). + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> ok; + {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} + end. %% --------------------------------------------------------------------------- %% Others diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl new file mode 100644 index 0000000000..af940dde97 --- /dev/null +++ b/src/rabbit_parameter_validation.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_parameter_validation). + +-export([number/2, binary/2, list/2, proplist/3]). + +number(_Name, Term) when is_number(Term) -> + ok; + +number(Name, Term) -> + {error, "~s should be number, actually was ~p", [Name, Term]}. + +binary(_Name, Term) when is_binary(Term) -> + ok; + +binary(Name, Term) -> + {error, "~s should be binary, actually was ~p", [Name, Term]}. + +list(_Name, Term) when is_list(Term) -> + ok; + +list(Name, Term) -> + {error, "~s should be list, actually was ~p", [Name, Term]}. + +proplist(Name, Constraints, Term) when is_list(Term) -> + {Results, Remainder} + = lists:foldl( + fun ({Key, Fun, Needed}, {Results0, Term0}) -> + case {lists:keytake(Key, 1, Term0), Needed} of + {{value, {Key, Value}, Term1}, _} -> + {[Fun(Key, Value) | Results0], + Term1}; + {false, mandatory} -> + {[{error, "Key \"~s\" not found in ~s", + [Key, Name]} | Results0], Term0}; + {false, optional} -> + {Results0, Term0} + end + end, {[], Term}, Constraints), + case Remainder of + [] -> Results; + _ -> [{error, "Unrecognised terms ~p in ~s", [Remainder, Name]} + | Results] + end; + +proplist(Name, _Constraints, Term) -> + {error, "~s not a list ~p", [Name, Term]}. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl new file mode 100644 index 0000000000..1551795f7c --- /dev/null +++ b/src/rabbit_policy.erl @@ -0,0 +1,156 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_policy). + +%% TODO specs + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +-export([register/0]). +-export([name/1, get/2, set/1]). +-export([validate/3, validate_clear/2, notify/3, notify_clear/2]). + +-rabbit_boot_step({?MODULE, + [{description, "policy parameters"}, + {mfa, {rabbit_policy, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +register() -> + rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE). + +name(#amqqueue{policy = Policy}) -> name0(Policy); +name(#exchange{policy = Policy}) -> name0(Policy). + +name0(undefined) -> none; +name0(Policy) -> pget(<<"name">>, Policy). + +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. + +set0(Name) -> match(Name, list()). + +get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); +get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); +%% Caution - SLOW. +get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())). + +get0(_Name, undefined) -> {error, not_found}; +get0(Name, List) -> case pget(<<"policy">>, List) of + undefined -> {error, not_found}; + Policy -> case pget(Name, Policy) of + undefined -> {error, not_found}; + Value -> {ok, Value} + end + end. + +%%---------------------------------------------------------------------------- + +validate(<<"policy">>, Name, Term) -> + rabbit_parameter_validation:proplist( + Name, policy_validation(), Term). + +validate_clear(<<"policy">>, _Name) -> + ok. + +notify(<<"policy">>, _Name, _Term) -> + update_policies(). + +notify_clear(<<"policy">>, _Name) -> + update_policies(). + +%%---------------------------------------------------------------------------- + +list() -> + [[{<<"name">>, pget(key, P)} | pget(value, P)] + || P <- rabbit_runtime_parameters:list(<<"policy">>)]. + +update_policies() -> + Policies = list(), + {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( + fun() -> + {[update_exchange(X, Policies) || + VHost <- rabbit_vhost:list(), + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies) || + VHost <- rabbit_vhost:list(), + Q <- rabbit_amqqueue:list(VHost)]} + end), + [notify(X) || X <- Xs], + [notify(Q) || Q <- Qs], + ok. + +update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> + NewPolicy = match(XName, Policies), + case NewPolicy of + OldPolicy -> no_change; + _ -> rabbit_exchange:update( + XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), + {X, X#exchange{policy = NewPolicy}} + end. + +update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> + NewPolicy = match(QName, Policies), + case NewPolicy of + OldPolicy -> no_change; + _ -> rabbit_amqqueue:update( + QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), + {Q, Q#amqqueue{policy = NewPolicy}} + end. + +notify(no_change)-> + ok; +notify({X1 = #exchange{}, X2 = #exchange{}}) -> + rabbit_exchange:policy_changed(X1, X2); +notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> + rabbit_amqqueue:policy_changed(Q1, Q2). + +match(Name, Policies) -> + case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of + [] -> undefined; + [Policy | _Rest] -> Policy + end. + +matches(#resource{name = Name, virtual_host = VHost}, Policy) -> + Prefix = pget(<<"prefix">>, Policy), + case pget(<<"vhost">>, Policy) of + undefined -> prefix(Prefix, Name); + VHost -> prefix(Prefix, Name); + _ -> false + end. + +prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)). + +sort_pred(A, B) -> + R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)), + case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of + {undefined, undefined} -> R; + {undefined, _} -> true; + {_, undefined} -> false; + _ -> R + end. + +%%---------------------------------------------------------------------------- + +policy_validation() -> + [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory}, + {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 07b39d8cc9..bd5cf58845 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -313,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> mainloop(Deb, State); -handle_other(timeout, _Deb, #v1{connection_state = S}) -> - throw({timeout, S}); +handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> + throw({heartbeat_timeout, S}); handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -683,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = fun() -> Parent ! timeout end, + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 637835c327..e14bbba018 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -104,9 +104,10 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 172cee92db..3a54e8f621 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). --export([parse_set/3, set/3, clear/2, list/0, list/1, list_formatted/0, - lookup/2, value/2, value/3, info_keys/0]). +-export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1, + list_formatted/0, lookup/2, value/2, value/3, info_keys/0]). %%---------------------------------------------------------------------------- @@ -31,7 +31,8 @@ -spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()). -spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). --spec(list/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). +-spec(list/1 :: (binary()) -> [rabbit_types:infos()]). +-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). -spec(list_formatted/0 :: () -> [rabbit_types:infos()]). -spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()). -spec(value/2 :: (binary(), binary()) -> term()). @@ -122,11 +123,14 @@ mnesia_clear(Component, Key) -> list() -> [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. -list(Component) -> +list(Component) -> list(Component, []). +list_strict(Component) -> list(Component, not_found). + +list(Component, Default) -> case lookup_component(Component) of {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'}, [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; - _ -> not_found + _ -> Default end. list_formatted() -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5545cccff7..bb60bd125e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -618,8 +618,8 @@ test_topic_matching() -> exchange_op_callback(X, Fun, Args) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end), - rabbit_exchange:callback(X, Fun, [none, X] ++ Args). + fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end), + rabbit_exchange:callback(X, Fun, none, [X] ++ Args). test_topic_expect_match(X, List) -> lists:foreach( diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 485ccc5f02..18704807ba 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -37,6 +37,9 @@ -rabbit_upgrade({mirrored_supervisor, mnesia, []}). -rabbit_upgrade({topic_trie_node, mnesia, []}). -rabbit_upgrade({runtime_parameters, mnesia, []}). +-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). +-rabbit_upgrade({policy, mnesia, + [exchange_scratches, ha_mirrors]}). %% ------------------------------------------------------------------- @@ -58,6 +61,7 @@ -spec(mirrored_supervisor/0 :: () -> 'ok'). -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). +-spec(policy/0 :: () -> 'ok'). -endif. @@ -193,6 +197,49 @@ runtime_parameters() -> {attributes, [key, value]}, {disc_copies, [node()]}]). +exchange_scratches() -> + ok = exchange_scratches(rabbit_exchange), + ok = exchange_scratches(rabbit_durable_exchange). + +exchange_scratches(Table) -> + transform( + Table, + fun ({exchange, Name, Type = <<"x-federation">>, Dur, AutoDel, Int, Args, + Scratch}) -> + Scratches = orddict:store(federation, Scratch, orddict:new()), + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}; + %% We assert here that nothing else uses the scratch mechanism ATM + ({exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, undefined} + end, + [name, type, durable, auto_delete, internal, arguments, scratches]). + +policy() -> + ok = exchange_policy(rabbit_exchange), + ok = exchange_policy(rabbit_durable_exchange), + ok = queue_policy(rabbit_queue), + ok = queue_policy(rabbit_durable_queue). + +exchange_policy(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, + undefined} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, + policy]). + +queue_policy(Table) -> + transform( + Table, + fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) -> + {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes, + undefined} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, + slave_pids, mirror_nodes, policy]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
