summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-06-07 15:11:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-06-07 15:11:16 +0100
commit5fc84cc1165ef1e45290d18a92d5841ab927282b (patch)
treed872673dce21a6e5c8d84559cd13b6db08490e85
parent707e264eb285811ec61628a14ee24c830055d881 (diff)
parentf7760cdee478f3b1134516c4580af93565afca0f (diff)
downloadrabbitmq-server-git-5fc84cc1165ef1e45290d18a92d5841ab927282b.tar.gz
Merge default
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl52
-rw-r--r--src/rabbit_binding.erl11
-rw-r--r--src/rabbit_exchange.erl153
-rw-r--r--src/rabbit_exchange_decorator.erl61
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_invalid.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_parameter_validation.erl61
-rw-r--r--src/rabbit_policy.erl152
-rw-r--r--src/rabbit_registry.erl7
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl47
18 files changed, 527 insertions, 85 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d6f6d51fab..68bc87be66 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -887,6 +887,10 @@
<listitem><para>Queue arguments.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>policy</term>
+ <listitem><para>Policy name applying to the queue.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>pid</term>
<listitem><para>Id of the Erlang process associated with the queue.</para></listitem>
</varlistentry>
@@ -999,6 +1003,10 @@
<term>arguments</term>
<listitem><para>Exchange arguments.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>policy</term>
+ <listitem><para>Policy name for applying to the exchange.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>exchangeinfoitem</command>s are specified then
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5c73c8b88b..e8b4a6232e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,11 +43,11 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratch}).
+ scratches, policy}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes}).
+ arguments, pid, slave_pids, mirror_nodes, policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c1673504e7..637a7d4d0b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,7 +28,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([store_queue/1]).
+-export([update/2, store_queue/1, policy_changed/2]).
%% internal
@@ -71,6 +71,9 @@
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -157,6 +160,8 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -222,9 +227,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- B = add_default_binding(Q),
- fun () -> B(), Q end;
+ [] -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -237,6 +243,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q = #amqqueue{durable = Durable}] ->
+ Q1 = Fun(Q),
+ ok = mnesia:write(rabbit_queue, Q1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
@@ -245,6 +264,9 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
+policy_changed(_Q1, _Q2) ->
+ ok.
+
determine_queue_nodes(Args) ->
Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f2833c2623..8933de8746 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, {new, Q}),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use,
- [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- BQS = bq_init(BQ, Q, Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
- rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #q.stats_timer,
- fun() -> emit_stats(State1) end),
- noreply(State1);
- Q1 -> {stop, normal, {existing, Q1}, State}
- end.
+ not_found ->
+ {stop, normal, not_found, State};
+ Q1 ->
+ case matches(Recover, Q, Q1) of
+ true ->
+ gen_server2:reply(From, {new, Q}),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = bq_init(BQ, Q, Recover),
+ State1 = process_args(State#q{backing_queue_state = BQS}),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #q.stats_timer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
+ false ->
+ {stop, normal, {existing, Q1}, State}
+ end
+ end.
+
+matches(true, Q, Q) -> true;
+matches(true, _Q, _Q1) -> false;
+matches(false, Q1, Q2) ->
+ %% i.e. not policy
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
+ Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
bq_init(BQ, Q, Recover) ->
Self = self(),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb44797e4d..f0ea514dcf 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -173,13 +173,11 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- ok = rabbit_exchange:callback(
- Src, add_binding, [transaction, Src, B]),
+ x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
fun () ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Serial, Src, B]),
- ok = rabbit_event:notify(binding_created, info(B))
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
false -> rabbit_misc:const({error, binding_not_found})
end.
@@ -487,4 +485,5 @@ process_deletions(Deletions) ->
del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
-x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).
+x_callback(Serial, X, F, Bs) ->
+ ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 910a89b42c..ab548f38f3 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,13 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, callback/3, declare/6,
+-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, update_scratch/2,
+ lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1]).
+-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -37,7 +37,11 @@
-type(fun_name() :: atom()).
-spec(recover/0 :: () -> [name()]).
--spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
+-spec(callback/4::
+ (rabbit_types:exchange(), fun_name(), non_neg_integer() | atom(),
+ [any()]) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -58,7 +62,13 @@
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
--spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok').
+-spec(lookup_scratch/2 :: (name(), atom()) ->
+ rabbit_types:ok(term()) |
+ rabbit_types:error('not_found')).
+-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -76,14 +86,16 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(serial/1 :: (rabbit_types:exchange()) ->
+ fun(() -> 'none' | pos_integer())).
-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
+ policy]).
recover() ->
Xs = rabbit_misc:table_filter(
@@ -95,21 +107,53 @@ recover() ->
true -> store(X);
false -> ok
end,
- rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
+ callback(X, create, map_create_tx(Tx), [X])
end,
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
-callback(#exchange{type = XType}, Fun, Args) ->
- apply(type_to_module(XType), Fun, Args).
+callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+ %% TODO cache this?
+ Serial = fun (Bool) ->
+ case Serial0 of
+ _ when is_atom(Serial0) -> Serial0;
+ _ -> Serial0(Bool)
+ end
+ end,
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
+ || M <- decorators()],
+ Module = type_to_module(XType),
+ apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
+
+policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+
+serialise_events(X = #exchange{type = Type}) ->
+ case [Serialise || M <- decorators(),
+ Serialise <- [M:serialise_events(X)],
+ Serialise == true] of
+ [] -> (type_to_module(Type)):serialise_events();
+ _ -> true
+ end.
+
+serial(#exchange{name = XName} = X) ->
+ Serial = case serialise_events(X) of
+ true -> next_serial(XName);
+ false -> none
+ end,
+ fun (true) -> Serial;
+ (false) -> none
+ end.
+
+decorators() ->
+ [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = #exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args},
+ X = rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args}),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -129,7 +173,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = XT:create(map_create_tx(Tx), Exchange),
+ ok = callback(X, create, map_create_tx(Tx), [Exchange]),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -141,13 +185,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X = #exchange{name = Name, type = Type}) ->
- ok = mnesia:write(rabbit_exchange, X, write),
- case (type_to_module(Type)):serialise_events() of
- true -> S = #exchange_serial{name = Name, next = 1},
- ok = mnesia:write(rabbit_exchange_serial, S, write);
- false -> ok
- end.
+store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -200,23 +238,51 @@ list(VHostPath) ->
rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
-update_scratch(Name, Fun) ->
+lookup_scratch(Name, App) ->
+ case lookup(Name) of
+ {ok, #exchange{scratches = undefined}} ->
+ {error, not_found};
+ {ok, #exchange{scratches = Scratches}} ->
+ case orddict:find(App, Scratches) of
+ {ok, Value} -> {ok, Value};
+ error -> {error, not_found}
+ end;
+ {error, not_found} ->
+ {error, not_found}
+ end.
+
+update_scratch(Name, App, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
- case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable, scratch = Scratch}] ->
- X1 = X#exchange{scratch = Fun(Scratch)},
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange,
- X1, write);
- _ -> ok
- end;
- [] ->
- ok
- end
+ update(Name,
+ fun(X = #exchange{scratches = Scratches0}) ->
+ Scratches1 = case Scratches0 of
+ undefined -> orddict:new();
+ _ -> Scratches0
+ end,
+ Scratch = case orddict:find(App, Scratches1) of
+ {ok, S} -> S;
+ error -> undefined
+ end,
+ Scratches2 = orddict:store(
+ App, Fun(Scratch), Scratches1),
+ X#exchange{scratches = Scratches2}
+ end)
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X = #exchange{durable = Durable}] ->
+ X1 = Fun(X),
+ ok = mnesia:write(rabbit_exchange, X1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -232,6 +298,7 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
+i(policy, X) -> rabbit_policy:name(X);
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
@@ -341,15 +408,11 @@ unconditional_delete(X = #exchange{name = XName}) ->
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-serial(#exchange{name = XName, type = Type}) ->
- case (type_to_module(Type)):serialise_events() of
- true -> next_serial(XName);
- false -> none
- end.
-
next_serial(XName) ->
- [#exchange_serial{next = Serial}] =
- mnesia:read(rabbit_exchange_serial, XName, write),
+ Serial = case mnesia:read(rabbit_exchange_serial, XName, write) of
+ [#exchange_serial{next = Next}] -> Next;
+ [] -> 1
+ end,
ok = mnesia:write(rabbit_exchange_serial,
#exchange_serial{name = XName, next = Serial + 1}, write),
Serial.
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
new file mode 100644
index 0000000000..bfb782012e
--- /dev/null
+++ b/src/rabbit_exchange_decorator.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_decorator).
+
+-ifdef(use_specs).
+
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
+-callback description() -> [proplist:property()].
+
+%% Should Rabbit ensure that all binding events that are
+%% delivered to an individual exchange can be serialised? (they
+%% might still be delivered out of order, but there'll be a
+%% serial number).
+-callback serialise_events(rabbit_types:exchange()) -> boolean().
+
+%% called after declaration and recovery
+-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
+
+%% called after exchange (auto)deletion.
+-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
+ 'ok'.
+
+%% called after a binding has been added or recovered
+-callback add_binding(serial(), rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok'.
+
+%% called after bindings have been deleted.
+-callback remove_bindings(serial(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok'.
+
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
+ {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1027570c8b..e6470b721e 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -58,6 +58,10 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
-else.
-export([behaviour_info/1]).
@@ -65,7 +69,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}];
+ {assert_args_equivalence, 2}, {policy_changed, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index cdec1cb9f2..9a5665c078 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -43,6 +43,7 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index a64f2c2924..d9a2f60fdd 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 61917d8f4a..516b78e59c 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 82d27960bc..101fe434e9 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
@@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 3160fdf46e..644d9acf07 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
+
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
add_binding(none, _Exchange, _Binding) ->
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
new file mode 100644
index 0000000000..af940dde97
--- /dev/null
+++ b/src/rabbit_parameter_validation.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_parameter_validation).
+
+-export([number/2, binary/2, list/2, proplist/3]).
+
+number(_Name, Term) when is_number(Term) ->
+ ok;
+
+number(Name, Term) ->
+ {error, "~s should be number, actually was ~p", [Name, Term]}.
+
+binary(_Name, Term) when is_binary(Term) ->
+ ok;
+
+binary(Name, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]}.
+
+list(_Name, Term) when is_list(Term) ->
+ ok;
+
+list(Name, Term) ->
+ {error, "~s should be list, actually was ~p", [Name, Term]}.
+
+proplist(Name, Constraints, Term) when is_list(Term) ->
+ {Results, Remainder}
+ = lists:foldl(
+ fun ({Key, Fun, Needed}, {Results0, Term0}) ->
+ case {lists:keytake(Key, 1, Term0), Needed} of
+ {{value, {Key, Value}, Term1}, _} ->
+ {[Fun(Key, Value) | Results0],
+ Term1};
+ {false, mandatory} ->
+ {[{error, "Key \"~s\" not found in ~s",
+ [Key, Name]} | Results0], Term0};
+ {false, optional} ->
+ {Results0, Term0}
+ end
+ end, {[], Term}, Constraints),
+ case Remainder of
+ [] -> Results;
+ _ -> [{error, "Unrecognised terms ~p in ~s", [Remainder, Name]}
+ | Results]
+ end;
+
+proplist(Name, _Constraints, Term) ->
+ {error, "~s not a list ~p", [Name, Term]}.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
new file mode 100644
index 0000000000..d4c2dee046
--- /dev/null
+++ b/src/rabbit_policy.erl
@@ -0,0 +1,152 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy).
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+-export([register/0]).
+-export([name/1, get/2, set/1]).
+-export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "policy parameters"},
+ {mfa, {rabbit_policy, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
+
+name(#amqqueue{policy = Policy}) -> name0(Policy);
+name(#exchange{policy = Policy}) -> name0(Policy).
+
+name0(undefined) -> none;
+name0(Policy) -> pget(<<"name">>, Policy).
+
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+
+set0(Name) -> match(Name, list()).
+
+get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
+get(Name, #exchange{policy = Policy}) -> get0(Name, Policy).
+
+get0(_Name, undefined) -> {error, not_found};
+get0(Name, List) -> case pget(<<"policy">>, List) of
+ undefined -> {error, not_found};
+ Policy -> case pget(Name, Policy) of
+ undefined -> {error, not_found};
+ Value -> {ok, Value}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+
+validate(<<"policy">>, Name, Term) ->
+ rabbit_parameter_validation:proplist(
+ Name, policy_validation(), Term).
+
+validate_clear(<<"policy">>, _Name) ->
+ ok.
+
+notify(<<"policy">>, _Name, _Term) ->
+ update_policies().
+
+notify_clear(<<"policy">>, _Name) ->
+ update_policies().
+
+%%----------------------------------------------------------------------------
+
+list() ->
+ [[{<<"name">>, pget(key, P)} | pget(value, P)]
+ || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+
+update_policies() ->
+ Policies = list(),
+ {Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ {[update_exchange(X, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end),
+ [notify(X) || X <- Xs],
+ [notify(Q) || Q <- Qs],
+ ok.
+
+update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(XName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_exchange:update(
+ XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
+ {X, X#exchange{policy = NewPolicy}}
+ end.
+
+update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(QName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_amqqueue:update(
+ QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
+ {Q, Q#amqqueue{policy = NewPolicy}}
+ end.
+
+notify(no_change)->
+ ok;
+notify({X1 = #exchange{}, X2 = #exchange{}}) ->
+ rabbit_exchange:policy_changed(X1, X2);
+notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
+ rabbit_amqqueue:policy_changed(Q1, Q2).
+
+match(Name, Policies) ->
+ case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
+ [] -> undefined;
+ [Policy | _Rest] -> Policy
+ end.
+
+matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
+ Prefix = pget(<<"prefix">>, Policy),
+ case pget(<<"vhost">>, Policy) of
+ undefined -> prefix(Prefix, Name);
+ VHost -> prefix(Prefix, Name);
+ _ -> false
+ end.
+
+prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+
+sort_pred(A, B) ->
+ R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
+ case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
+ {undefined, undefined} -> R;
+ {undefined, _} -> true;
+ {_, undefined} -> false;
+ _ -> R
+ end.
+
+%%----------------------------------------------------------------------------
+
+policy_validation() ->
+ [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
+ {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 637835c327..e14bbba018 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -104,9 +104,10 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5545cccff7..bb60bd125e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -618,8 +618,8 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, Args) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end),
- rabbit_exchange:callback(X, Fun, [none, X] ++ Args).
+ fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end),
+ rabbit_exchange:callback(X, Fun, none, [X] ++ Args).
test_topic_expect_match(X, List) ->
lists:foreach(
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 485ccc5f02..18704807ba 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -37,6 +37,9 @@
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
-rabbit_upgrade({topic_trie_node, mnesia, []}).
-rabbit_upgrade({runtime_parameters, mnesia, []}).
+-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
+-rabbit_upgrade({policy, mnesia,
+ [exchange_scratches, ha_mirrors]}).
%% -------------------------------------------------------------------
@@ -58,6 +61,7 @@
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
+-spec(policy/0 :: () -> 'ok').
-endif.
@@ -193,6 +197,49 @@ runtime_parameters() ->
{attributes, [key, value]},
{disc_copies, [node()]}]).
+exchange_scratches() ->
+ ok = exchange_scratches(rabbit_exchange),
+ ok = exchange_scratches(rabbit_durable_exchange).
+
+exchange_scratches(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type = <<"x-federation">>, Dur, AutoDel, Int, Args,
+ Scratch}) ->
+ Scratches = orddict:store(federation, Scratch, orddict:new()),
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches};
+ %% We assert here that nothing else uses the scratch mechanism ATM
+ ({exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches]).
+
+policy() ->
+ ok = exchange_policy(rabbit_exchange),
+ ok = exchange_policy(rabbit_durable_exchange),
+ ok = queue_policy(rabbit_queue),
+ ok = queue_policy(rabbit_durable_queue).
+
+exchange_policy(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches,
+ policy]).
+
+queue_policy(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) ->
+ {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes,
+ undefined}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid,
+ slave_pids, mirror_nodes, policy]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->