summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gatherer.erl51
-rw-r--r--src/gm.erl113
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl3
-rw-r--r--src/gm_tests.erl10
-rw-r--r--src/rabbit.erl51
-rw-r--r--src/rabbit_amqqueue.erl52
-rw-r--r--src/rabbit_amqqueue_process.erl177
-rw-r--r--src/rabbit_backing_queue.erl33
-rw-r--r--src/rabbit_backing_queue_qc.erl18
-rw-r--r--src/rabbit_control_main.erl40
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_guid.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl65
-rw-r--r--src/rabbit_mirror_queue_master.erl118
-rw-r--r--src/rabbit_mirror_queue_misc.erl164
-rw-r--r--src/rabbit_mirror_queue_slave.erl322
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_mnesia.erl33
-rw-r--r--src/rabbit_node_monitor.erl41
-rw-r--r--src/rabbit_policy.erl147
-rw-r--r--src/rabbit_policy_validator.erl37
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_registry.erl3
-rw-r--r--src/rabbit_runtime_parameters.erl109
-rw-r--r--src/rabbit_runtime_parameters_test.erl30
-rw-r--r--src/rabbit_tests.erl85
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_variable_queue.erl28
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_vm.erl129
31 files changed, 1210 insertions, 692 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 98b360389a..29d2d71366 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]).
+-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -32,6 +32,7 @@
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
-spec(in/2 :: (pid(), any()) -> 'ok').
+-spec(sync_in/2 :: (pid(), any()) -> 'ok').
-spec(out/1 :: (pid()) -> {'value', any()} | 'empty').
-endif.
@@ -62,6 +63,9 @@ finish(Pid) ->
in(Pid, Value) ->
gen_server2:cast(Pid, {in, Value}).
+sync_in(Pid, Value) ->
+ gen_server2:call(Pid, {in, Value}, infinity).
+
out(Pid) ->
gen_server2:call(Pid, out, infinity).
@@ -78,19 +82,22 @@ handle_call(stop, _From, State) ->
handle_call(fork, _From, State = #gstate { forks = Forks }) ->
{reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
+handle_call({in, Value}, From, State) ->
+ {noreply, in(Value, From, State), hibernate};
+
handle_call(out, From, State = #gstate { forks = Forks,
values = Values,
blocked = Blocked }) ->
case queue:out(Values) of
+ {empty, _} when Forks == 0 ->
+ {reply, empty, State, hibernate};
{empty, _} ->
- case Forks of
- 0 -> {reply, empty, State, hibernate};
- _ -> {noreply,
- State #gstate { blocked = queue:in(From, Blocked) },
- hibernate}
- end;
- {{value, _Value} = V, NewValues} ->
- {reply, V, State #gstate { values = NewValues }, hibernate}
+ {noreply, State #gstate { blocked = queue:in(From, Blocked) },
+ hibernate};
+ {{value, {PendingIn, Value}}, NewValues} ->
+ reply(PendingIn, ok),
+ {reply, {value, Value}, State #gstate { values = NewValues },
+ hibernate}
end;
handle_call(Msg, _From, State) ->
@@ -107,15 +114,8 @@ handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
{noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
hibernate};
-handle_cast({in, Value}, State = #gstate { values = Values,
- blocked = Blocked }) ->
- {noreply, case queue:out(Blocked) of
- {empty, _} ->
- State #gstate { values = queue:in(Value, Values) };
- {{value, From}, NewBlocked} ->
- gen_server2:reply(From, {value, Value}),
- State #gstate { blocked = NewBlocked }
- end, hibernate};
+handle_cast({in, Value}, State) ->
+ {noreply, in(Value, undefined, State), hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -128,3 +128,18 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
+
+%%----------------------------------------------------------------------------
+
+in(Value, From, State = #gstate { values = Values, blocked = Blocked }) ->
+ case queue:out(Blocked) of
+ {empty, _} ->
+ State #gstate { values = queue:in({From, Value}, Values) };
+ {{value, PendingOut}, NewBlocked} ->
+ reply(From, ok),
+ gen_server2:reply(PendingOut, {value, Value}),
+ State #gstate { blocked = NewBlocked }
+ end.
+
+reply(undefined, _Reply) -> ok;
+reply(From, Reply) -> gen_server2:reply(From, Reply).
diff --git a/src/gm.erl b/src/gm.erl
index 90433e84c5..4a95de0dd1 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,7 +376,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/3, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2,
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -408,7 +408,8 @@
callback_args,
confirms,
broadcast_buffer,
- broadcast_timer
+ broadcast_timer,
+ txn_executor
}).
-record(gm_group, { name, version, members }).
@@ -428,9 +429,10 @@
-export_type([group_name/0]).
-type(group_name() :: any()).
+-type(txn_fun() :: fun((fun(() -> any())) -> any())).
-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
--spec(start_link/3 :: (group_name(), atom(), any()) ->
+-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
@@ -507,8 +509,8 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
-start_link(GroupName, Module, Args) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+start_link(GroupName, Module, Args, TxnFun) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -529,7 +531,7 @@ forget_group(GroupName) ->
end),
ok.
-init([GroupName, Module, Args]) ->
+init([GroupName, Module, Args, TxnFun]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -545,7 +547,8 @@ init([GroupName, Module, Args]) ->
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
- broadcast_timer = undefined }, hibernate,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From,
view = View,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
{MembersState1, Group} =
record_new_member_in_group(
GroupName, Self, NewMember,
@@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From,
{catchup, Self,
prepare_members_state(MembersState1)}),
MembersState1
- end),
+ end, TxnFun),
View2 = group_to_view(Group),
State1 = check_neighbours(State #state { view = View2,
members_state = MembersState1 }),
@@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
- callback_args = Args }) ->
- View = join_group(Self, GroupName),
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
+ View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
view = View,
module = Module,
callback_args = Args,
- confirms = Confirms }) ->
+ confirms = Confirms,
+ txn_executor = TxnFun }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
@@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
noreply(State);
_ ->
View1 =
- group_to_view(record_dead_member_in_group(Member, GroupName)),
+ group_to_view(record_dead_member_in_group(Member,
+ GroupName, TxnFun)),
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
@@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName) ->
- join_group(Self, GroupName, read_group(GroupName)).
+join_group(Self, GroupName, TxnFun) ->
+ join_group(Self, GroupName, read_group(GroupName), TxnFun).
-join_group(Self, GroupName, {error, not_found}) ->
- join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -1000,20 +1008,22 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName));
+ prune_or_create_group(Self, GroupName, TxnFun));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ record_dead_member_in_group(
+ Left, GroupName, TxnFun),
+ TxnFun)
end,
try
case gen_server2:call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName)
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1032,29 +1042,29 @@ read_group(GroupName) ->
[Group] -> Group
end.
-prune_or_create_group(Self, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
- fun () -> GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = ?VERSION_START },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
- end
- end),
+prune_or_create_group(Self, GroupName, TxnFun) ->
+ Group = TxnFun(
+ fun () ->
+ GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = ?VERSION_START },
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
Group.
-record_dead_member_in_group(Member, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
+record_dead_member_in_group(Member, GroupName, TxnFun) ->
+ Group =
+ TxnFun(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
@@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) ->
end),
Group.
-record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, {Result, Group}} =
- mnesia:sync_transaction(
+record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
+ {Result, Group} =
+ TxnFun(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
mnesia:read({?GROUP_TABLE, GroupName}),
@@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
end),
{Result, Group}.
-erase_members_in_group(Members, GroupName) ->
+erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- {atomic, Group} =
- mnesia:sync_transaction(
+ Group =
+ TxnFun(
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
@@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self,
view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }, View) ->
+ callback_args = Args,
+ txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self,
case Erasable of
[] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
- erase_members_in_group(Erasable, GroupName)),
+ erase_members_in_group(Erasable, GroupName, TxnFun)),
{callback_view_changed(Args, Module, View0, View1),
check_neighbours(State1 #state { view = View1 })}
end.
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 572175410d..5fbfc22371 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -105,7 +105,9 @@ spawn_member() ->
random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ {ok, Pid} = gm:start_link(
+ ?MODULE, ?MODULE, [],
+ fun rabbit_misc:execute_mnesia_transaction/1),
Start = random:uniform(10000),
send_loop(Pid, Start, Start + random:uniform(10000)),
gm:leave(Pid),
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index dad75bd447..84d4ab2fb1 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -44,7 +44,8 @@ terminate(Owner, _Reason) ->
%% other
wile_e_coyote(Time, WriteUnit) ->
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive joined -> ok end,
timer:sleep(1000), %% wait for all to join
timer:send_after(Time, stop),
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 0a2d420469..a9c0ba9035 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -76,7 +76,9 @@ test_confirmed_broadcast() ->
test_member_death() ->
with_two_members(
fun (Pid, Pid2) ->
- {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid3} = gm:start_link(
+ ?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
timeout_joining_gm_group_3),
passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
@@ -128,10 +130,12 @@ test_broadcast_fun(Fun) ->
with_two_members(Fun) ->
ok = gm:create_tables(),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
- {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7b417b00c6..69f77824f6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -300,9 +300,9 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
- ok = rabbit_node_monitor:prepare_cluster_status_files(),
- ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
+ rabbit_mnesia:check_cluster_consistency(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
ok = print_plugin_info(rabbit_plugins:active())
@@ -312,13 +312,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
- ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
- ok = rabbit_mnesia:check_cluster_consistency(),
+ rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -330,21 +330,26 @@ boot() ->
end).
handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- boot_error({could_not_start, App, Reason}, not_available);
+ throw({could_not_start, App, Reason});
handle_app_error(App, Reason) ->
- boot_error({could_not_start, App, Reason}, not_available).
+ throw({could_not_start, App, Reason}).
start_it(StartFun) ->
try
StartFun()
+ catch
+ throw:{could_not_start, _App, _Reason}=Err ->
+ boot_error(Err, not_available);
+ _:Reason ->
+ boot_error(Reason, erlang:get_stacktrace())
after
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- rabbit_log:info("Stopping Rabbit~n"),
+ rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
stop_and_halt() ->
@@ -364,7 +369,7 @@ status() ->
{running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}],
+ {memory, rabbit_vm:memory()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
@@ -412,6 +417,9 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
+ {ok, Vsn} = application:get_key(rabbit, vsn),
+ error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n",
+ [Vsn, erlang:system_info(otp_release)]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
@@ -498,13 +506,16 @@ sort_boot_steps(UnsortedSteps) ->
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
+ {missing_functions, MissingFunctions},
"Boot step functions not exported: ~p~n",
[MissingFunctions])
end;
{error, {vertex, duplicate, StepName}} ->
- basic_boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ basic_boot_error({duplicate_boot_step, StepName},
+ "Duplicate boot step name: ~w~n", [StepName]);
{error, {edge, Reason, From, To}} ->
basic_boot_error(
+ {invalid_boot_step_dependency, From, To},
"Could not add boot step dependency of ~w on ~w:~n~s",
[To, From,
case Reason of
@@ -518,7 +529,7 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
-boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
case AllNodes -- [node()] of
@@ -529,23 +540,27 @@ boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
end,
- basic_boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
-
+ basic_boot_error(Term,
+ Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
boot_error(Reason, Stacktrace) ->
- Fmt = "Error description:~n ~p~n~n"
+ Fmt = "Error description:~n ~p~n~n" ++
"Log files (may contain more information):~n ~s~n ~s~n~n",
Args = [Reason, log_location(kernel), log_location(sasl)],
+ boot_error(Reason, Fmt, Args, Stacktrace).
+
+boot_error(Reason, Fmt, Args, Stacktrace) ->
case Stacktrace of
- not_available -> basic_boot_error(Fmt, Args);
- _ -> basic_boot_error(Fmt ++ "Stack trace:~n ~p~n~n",
+ not_available -> basic_boot_error(Reason, Fmt, Args);
+ _ -> basic_boot_error(Reason, Fmt ++
+ "Stack trace:~n ~p~n~n",
Args ++ [Stacktrace])
end.
-basic_boot_error(Format, Args) ->
+basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- error_logger:error_msg(Format, Args),
+ rabbit_misc:local_info_msg(Format, Args),
timer:sleep(1000),
- exit({?MODULE, failure_during_boot}).
+ exit({?MODULE, failure_during_boot, Reason}).
%%---------------------------------------------------------------------------
%% boot step functions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8fc103e487..6ad85b24f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
- sync_slave_pids = []}),
+ sync_slave_pids = [],
+ gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
@@ -622,29 +623,50 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
%% returned. It is therefore safe to use a fire-and-forget cast
%% here and return the QPids - the semantics is preserved. This
%% scales much better than the case below.
- QPids = qpids(Qs),
+ {MPids, SPids} = qpids(Qs),
+ QPids = MPids ++ SPids,
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
noflow -> ok
end,
- delegate:invoke_no_result(
- QPids, fun (QPid) ->
- gen_server2:cast(QPid, {deliver, Delivery, Flow})
- end),
+
+ %% We let slaves know that they were being addressed as slaves at
+ %% the time - if they receive such a message from the channel
+ %% after they have become master they should mark the message as
+ %% 'delivered' since they do not know what the master may have
+ %% done with it.
+ MMsg = {deliver, Delivery, false, Flow},
+ SMsg = {deliver, Delivery, true, Flow},
+ delegate:invoke_no_result(MPids,
+ fun (QPid) -> gen_server2:cast(QPid, MMsg) end),
+ delegate:invoke_no_result(SPids,
+ fun (QPid) -> gen_server2:cast(QPid, SMsg) end),
{routed, QPids};
deliver(Qs, Delivery, _Flow) ->
- case delegate:invoke(
- qpids(Qs), fun (QPid) ->
- ok = gen_server2:call(QPid, {deliver, Delivery},
- infinity)
- end) of
- {[], _} -> {unroutable, []};
- {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
+ {MPids, SPids} = qpids(Qs),
+ %% see comment above
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
+ {MRouted, _} = delegate:invoke(
+ MPids, fun (QPid) ->
+ ok = gen_server2:call(QPid, MMsg, infinity)
+ end),
+ {SRouted, _} = delegate:invoke(
+ SPids, fun (QPid) ->
+ ok = gen_server2:call(QPid, SMsg, infinity)
+ end),
+ case MRouted ++ SRouted of
+ [] -> {unroutable, []};
+ R -> {routed, [QPid || {QPid, ok} <- R]}
end.
-qpids(Qs) -> lists:append([[QPid | SPids] ||
- #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+qpids(Qs) ->
+ {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
+ {MPidAcc, SPidAcc}) ->
+ {[QPid | MPidAcc], [SPids | SPidAcc]}
+ end, {[], []}, Qs),
+ {MPids, lists:append(SPids)}.
safe_delegate_call_ok(F, Pids) ->
{_, Bads} = delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e8d8fa5e6b..68f95778b5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/8]).
+-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -76,8 +76,8 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/8 ::
- (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
+-spec(init_with_backing_queue_state/7 ::
+ (rabbit_types:amqqueue(), atom(), tuple(), any(),
[rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -86,12 +86,14 @@
-define(STATISTICS_KEYS,
[pid,
+ policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
messages,
consumers,
+ active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
@@ -144,7 +146,7 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, Senders, MTC) ->
+ RateTRef, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -166,9 +168,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
delayed_stop = undefined,
queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
- State1 = requeue_and_run(AckTags, process_args(
- rabbit_event:init_stats_timer(
- State, #q.stats_timer))),
+ State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State1, Deliveries).
@@ -497,32 +497,29 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
- never;
-should_confirm_message(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+ {never, State};
+send_or_record_confirm(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- #q{q = #amqqueue{durable = true}}) ->
- {eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo},
- _State) ->
- {immediately, SenderPid, MsgSeqNo}.
-
-needs_confirming({eventually, _, _, _}) -> true;
-needs_confirming(_) -> false.
-
-maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
- State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel =
- gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
-maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
+ {eventually, State#q{msg_id_to_channel = MTC1}};
+send_or_record_confirm(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- State;
-maybe_record_confirm_message(_Confirm, State) ->
- State.
+ {immediately, State}.
+
+discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
+ State) ->
+ %% fake an 'eventual' confirm from BQ; noop if not needed
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([MsgId], State),
+ BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -532,58 +529,50 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
+attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+ Props = #message_properties{delivered = Delivered},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ fun (true, State1 = #q{backing_queue_state = BQS2}) ->
{AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, Props#message_properties.delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}}
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}};
+ (false, State1) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
- {Duplicate, BQS1} ->
- %% if the message has previously been seen by the BQ then
- %% it must have been seen under the same circumstances as
- %% now: i.e. if it is now a deliver_immediately then it
- %% must have been before.
- {case Duplicate of
- published -> true;
- discarded -> false
- end,
- State#q{backing_queue_state = BQS1}}
+ {published, BQS1} ->
+ {true, State#q{backing_queue_state = BQS1}};
+ {discarded, BQS1} ->
+ {false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, Delivered,
- State) ->
- Confirm = should_confirm_message(Delivery, State),
+deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+ Delivered, State) ->
+ {Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State) of
- {true, State1} ->
- maybe_record_confirm_message(Confirm, State1);
- %% the next one is an optimisations
- %% TODO: optimise the Confirm =/= never case too
- {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
- discard_delivery(Delivery, State1);
- {false, State1} ->
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
+ case attempt_delivery(Delivery, Props, State1) of
+ {true, State2} ->
+ State2;
+ %% The next one is an optimisation
+ {false, State2 = #q{ttl = 0, dlx = undefined}} ->
+ discard(Delivery, State2);
+ {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
- BQS1
- end, State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ run_message_queue(State#q{backing_queue_state = BQS1}).
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+fetch(AckRequired, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
@@ -677,12 +666,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-backing_queue_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-
-run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
+backing_queue_timeout(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
case lookup_ch(ChPid) of
@@ -694,15 +680,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-discard_delivery(#delivery{sender = SenderPid,
- message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-
message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm),
+ needs_confirming = Confirm == eventually,
delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
@@ -715,16 +695,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} =
- case DLXFun of
- undefined ->
- {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ ->
- {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun(Msgs),
- {Next, BQS2}
- end,
+ {Props, BQS1} = case DLXFun of
+ undefined -> {Next, undefined, BQS2} =
+ BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ -> {Next, Msgs, BQS2} =
+ BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun(Msgs),
+ {Next, BQS2}
+ end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
@@ -895,6 +874,12 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
+i(policy, #q{q = #amqqueue{name = Name}}) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
+ case rabbit_policy:name(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -912,6 +897,8 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(active_consumers, _) ->
+ active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1032,10 +1019,10 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver, Delivery}, From, State) ->
+handle_call({deliver, Delivery, Delivered}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, false, State));
+ noreply(deliver_or_enqueue(Delivery, Delivered, State));
handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1192,10 +1179,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
-handle_cast({run_backing_queue, Mod, Fun}, State) ->
- noreply(run_backing_queue(Mod, Fun, State));
+handle_cast({run_backing_queue, Mod, Fun},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ noreply(run_message_queue(
+ State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
@@ -1204,7 +1193,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, false, State1));
+ noreply(deliver_or_enqueue(Delivery, Delivered, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d69a6c3b98..af660c60a0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -24,6 +24,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
@@ -83,12 +84,16 @@
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
--callback publish_delivered(true, rabbit_types:basic_message(),
+-callback publish_delivered(rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}.
+ -> {ack(), state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -117,7 +122,7 @@
%% first time the message id appears in the result of
%% drain_confirmed. All subsequent appearances of that message id will
%% be ignored.
--callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+-callback drain_confirmed(state()) -> {msg_ids(), state()}.
%% Drop messages from the head of the queue while the supplied predicate returns
%% true. Also accepts a boolean parameter that determines whether the messages
@@ -136,7 +141,7 @@
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
--callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback ack([ack()], state()) -> {msg_ids(), state()}.
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
@@ -144,7 +149,7 @@
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
--callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback requeue([ack()], state()) -> {msg_ids(), state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -199,13 +204,6 @@
-callback is_duplicate(rabbit_types:basic_message(), state())
-> {'false'|'published'|'discarded', state()}.
-%% Called to inform the BQ about messages which have reached the
-%% queue, but are not going to be further passed to BQ for some
-%% reason. Note that this is may be invoked for messages for which
-%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
-%% BQS}.
--callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
-
-else.
-export([behaviour_info/1]).
@@ -213,12 +211,11 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
- {discard, 3}];
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e40d9b29e6..b37fbb29e2 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -119,7 +119,7 @@ qc_publish_multiple(#state{}) ->
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+ [qc_message(), #message_properties{}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
@@ -199,7 +199,7 @@ next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) ->
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
- [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ [Msg, MsgProps, _Pid, _BQ]}) ->
#state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S,
AckTag = {call, erlang, element, [1, Res]},
BQ1 = {call, erlang, element, [2, Res]},
@@ -213,10 +213,7 @@ next_state(S, Res,
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end,
- acks = case AckReq of
- true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks];
- false -> Acks
- end
+ acks = [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
@@ -391,4 +388,13 @@ drop_messages(Messages) ->
end
end.
+-else.
+
+-export([prop_disabled/0]).
+
+prop_disabled() ->
+ exit({compiled_without_proper,
+ "PropEr was not present during compilation of the test module. "
+ "Hence all tests are disabled."}).
+
-endif.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index e75e1f6f7c..25f7d758c6 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -70,6 +70,10 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
+ {set_policy, [?VHOST_DEF]},
+ {clear_policy, [?VHOST_DEF]},
+ {list_policies, [?VHOST_DEF]},
+
{list_queues, [?VHOST_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
@@ -98,7 +102,9 @@
{"Bindings", rabbit_binding, info_all, info_keys},
{"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys},
{"Permissions", rabbit_auth_backend_internal, list_vhost_permissions,
- vhost_perms_info_keys}]).
+ vhost_perms_info_keys},
+ {"Policies", rabbit_policy, list_formatted, info_keys},
+ {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
%%----------------------------------------------------------------------------
@@ -170,8 +176,8 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
- {parse_error, {_Line, Mod, Err}} ->
- print_error("~s", [lists:flatten(Mod:format_error(Err))]),
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
@@ -458,6 +464,28 @@ action(list_parameters, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
+action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
+ when Prio == [] orelse length(Prio) == 1 ->
+ Msg = "Setting policy ~p for pattern ~p to ~p",
+ {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
+ [P] -> {Msg ++ " with priority ~s", P}
+ end,
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
+ rpc_call(Node, rabbit_policy, parse_set,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
+
+action(clear_policy, Node, [Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing policy ~p", [Key]),
+ rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+
+action(list_policies, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing policies", []),
+ display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]),
+ rabbit_policy:info_keys());
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
@@ -477,12 +505,14 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
Node, erl_eval, exprs, [Parsed, []]),
io:format("~p~n", [Value]),
ok;
- {error, E} -> {parse_error, E}
+ {error, E} -> {error_string, format_parse_error(E)}
end;
{error, E, _} ->
- {parse_error, E}
+ {error_string, format_parse_error(E)}
end.
+format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4cc96ef552..a205b23d0b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -298,7 +298,10 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
-i(policy, X) -> rabbit_policy:name(X);
+i(policy, X) -> case rabbit_policy:name(X) of
+ none -> '';
+ Policy -> Policy
+ end;
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index ba0cb04f71..cedbbdb380 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -144,11 +144,7 @@ gen_secure() ->
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
string(G, Prefix) ->
- Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
- ($\/, Acc) -> [$\_ | Acc];
- ($\=, Acc) -> Acc;
- (Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(G)).
+ Prefix ++ "-" ++ rabbit_misc:base64url(G).
binary(G, Prefix) ->
list_to_binary(string(G, Prefix)).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 5284000bff..e1a21cf786 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -33,14 +33,14 @@
gm,
monitors,
death_fun,
- length_fun
+ depth_fun
}).
-ifdef(use_specs).
-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
rabbit_mirror_queue_master:death_fun(),
- rabbit_mirror_queue_master:length_fun()) ->
+ rabbit_mirror_queue_master:depth_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(get_gm/1 :: (pid()) -> pid()).
-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
@@ -101,19 +101,25 @@
%% channel during a publish, only some of the mirrors may receive that
%% publish. As a result of this problem, the messages broadcast over
%% the gm contain published content, and thus slaves can operate
-%% successfully on messages that they only receive via the gm. The key
-%% purpose of also sending messages directly from the channels to the
-%% slaves is that without this, in the event of the death of the
-%% master, messages could be lost until a suitable slave is promoted.
+%% successfully on messages that they only receive via the gm.
%%
-%% However, that is not the only reason. For example, if confirms are
-%% in use, then there is no guarantee that every slave will see the
-%% delivery with the same msg_seq_no. As a result, the slaves have to
-%% wait until they've seen both the publish via gm, and the publish
-%% via the channel before they have enough information to be able to
-%% perform the publish to their own bq, and subsequently issue the
-%% confirm, if necessary. Either form of publish can arrive first, and
-%% a slave can be upgraded to the master at any point during this
+%% The key purpose of also sending messages directly from the channels
+%% to the slaves is that without this, in the event of the death of
+%% the master, messages could be lost until a suitable slave is
+%% promoted. However, that is not the only reason. A slave cannot send
+%% confirms for a message until it has seen it from the
+%% channel. Otherwise, it might send a confirm to a channel for a
+%% message that it might *never* receive from that channel. This can
+%% happen because new slaves join the gm ring (and thus receive
+%% messages from the master) before inserting themselves in the
+%% queue's mnesia record (which is what channels look at for routing).
+%% As it turns out, channels will simply ignore such bogus confirms,
+%% but relying on that would introduce a dangerously tight coupling.
+%%
+%% Hence the slaves have to wait until they've seen both the publish
+%% via gm, and the publish via the channel before they issue the
+%% confirm. Either form of publish can arrive first, and a slave can
+%% be upgraded to the master at any point during this
%% process. Confirms continue to be issued correctly, however.
%%
%% Because the slave is a full process, it impersonates parts of the
@@ -154,8 +160,8 @@
%% be able to work out when their head does not differ from the master
%% (and is much simpler and cheaper than getting the master to hang on
%% to the guid of the msg at the head of its queue). When a slave is
-%% promoted to a master, it unilaterally broadcasts its length, in
-%% order to solve the problem of length requests from new slaves being
+%% promoted to a master, it unilaterally broadcasts its depth, in
+%% order to solve the problem of depth requests from new slaves being
%% unanswered by a dead master.
%%
%% Obviously, due to the async nature of communication across gm, the
@@ -297,15 +303,15 @@
%% if they have no mirrored content at all. This is not surprising: to
%% achieve anything more sophisticated would require the master and
%% recovering slave to be able to check to see whether they agree on
-%% the last seen state of the queue: checking length alone is not
+%% the last seen state of the queue: checking depth alone is not
%% sufficient in this case.
%%
%% For more documentation see the comments in bug 23554.
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun, LengthFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
+start_link(Queue, GM, DeathFun, DepthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -317,10 +323,12 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
GM1 = case GM of
undefined ->
- {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM2} = gm:start_link(
+ QueueName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM2, _Members} ->
ok
end,
@@ -333,7 +341,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
gm = GM1,
monitors = pmon:new(),
death_fun = DeathFun,
- length_fun = LengthFun },
+ depth_fun = DepthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -343,18 +351,17 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, MPid, DeadPids, ExtraNodes} ->
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of
+ {ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
{error, not_found} ->
{stop, normal, State}
end;
-handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
- ok = LengthFun(),
+handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
+ ok = DepthFun(),
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
@@ -398,9 +405,7 @@ members_changed([_CPid], _Births, []) ->
members_changed([CPid], _Births, Deaths) ->
ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
-handle_msg([_CPid], _From, master_changed) ->
- ok;
-handle_msg([CPid], _From, request_length = Msg) ->
+handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index cfef98b736..cce19c907a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,15 +17,15 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
+ status/1, invoke/3, is_duplicate/2, fold/3]).
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
+-export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
-export([init_with_existing_bq/3, stop_mirroring/1]).
@@ -46,10 +46,10 @@
-ifdef(use_specs).
--export_type([death_fun/0, length_fun/0]).
+-export_type([death_fun/0, depth_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
--type(length_fun() :: fun (() -> 'ok')).
+-type(depth_fun() :: fun (() -> 'ok')).
-type(master_state() :: #state { gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
@@ -61,10 +61,11 @@
known_senders :: set()
}).
--spec(promote_backing_queue_state/6 ::
- (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
+-spec(promote_backing_queue_state/7 ::
+ (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
+ master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
--spec(length_fun/0 :: () -> length_fun()).
+-spec(depth_fun/0 :: () -> depth_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
@@ -87,18 +88,27 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(Q, Recover, AsyncCallback) ->
+init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- init_with_existing_bq(Q, BQ, BQS).
-
-init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), length_fun()),
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
+ State.
+
+init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun(), depth_fun()),
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ end),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -147,7 +157,17 @@ stop_all_slaves(Reason, #state{gm = GM}) ->
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
- ok = gm:forget_group(proplists:get_value(group_name, Info)).
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ QName = proplists:get_value(group_name, Info),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { gm_pids = [], slave_pids = [] })
+ end),
+ ok = gm:forget_group(QName).
purge(State = #state { gm = GM,
backing_queue = BQ,
@@ -163,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
-publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
+publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
- {AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {AckTag,
- ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 })}.
+ State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ {AckTag, ensure_monitoring(ChPid, State1)}.
+
+discard(MsgId, ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
+ %% It's a massive error if we get told to discard something that's
+ %% already been published or published-and-confirmed. To do that
+ %% would require non FIFO access. Hence we should not find
+ %% 'published' or 'confirmed' in this dict:find.
+ case dict:find(MsgId, SS) of
+ error ->
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ ensure_monitoring(
+ ChPid, State #state {
+ backing_queue_state = BQS1,
+ seen_status = dict:erase(MsgId, SS) });
+ {ok, discarded} ->
+ State
+ end.
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
@@ -355,35 +392,20 @@ is_duplicate(Message = #basic_message { id = MsgId },
{discarded, State}
end.
-discard(Msg = #basic_message { id = MsgId }, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS),
- seen_status = dict:erase(MsgId, SS) };
- {ok, discarded} ->
- State
- end.
-
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
- Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
+promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ Len = BQ:len(BQS1),
+ Depth = BQ:depth(BQS1),
+ true = Len == Depth, %% ASSERTION: everything must have been requeued
+ ok = gm:broadcast(GM, {depth, Depth}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
- backing_queue_state = BQS,
+ backing_queue_state = BQS1,
set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
@@ -402,7 +424,7 @@ sender_death_fun() ->
end)
end.
-length_fun() ->
+depth_fun() ->
Self = self(),
fun () ->
rabbit_amqqueue:run_backing_queue(
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 453f2f2c68..4a00846e4d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -15,27 +15,38 @@
%%
-module(rabbit_mirror_queue_misc).
+-behaviour(rabbit_policy_validator).
--export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2,
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1]).
%% for testing only
-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
+-rabbit_boot_step({?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(remove_from_queue/2 ::
- (rabbit_amqqueue:name(), [pid()])
- -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
+-spec(remove_from_queue/3 ::
+ (rabbit_amqqueue:name(), pid(), [pid()])
+ -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+ (rabbit_amqqueue:name(), node()) ->
+ {'ok', atom()} | rabbit_types:error(any())).
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -56,10 +67,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-
-remove_from_queue(QueueName, DeadGMPids) ->
- DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes,
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -67,55 +75,65 @@ remove_from_queue(QueueName, DeadGMPids) ->
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
- Alive = [Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
+ {Dead, GMPids1} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- Dead],
+ Alive = [QPid | SPids] -- DeadPids,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, [], []};
- _ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
+ GMPids = GMPids1, %% ASSERTION
+ {ok, QPid1, []};
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = store_updated_slaves(
- Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
- %% Sometimes a slave dying means we need
- %% to start more on other nodes -
- %% "exactly" mode can cause this to
- %% happen.
- {_, OldNodes} = actual_queue_nodes(Q1),
- {_, NewNodes} = suggested_queue_nodes(
- Q1, ClusterNodes),
- {ok, QPid1, [QPid | SPids] -- Alive,
- NewNodes -- OldNodes};
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = GMPids1 }),
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, [], []}
+ {ok, QPid1, []}
end
end
end).
on_node_up() ->
- ClusterNodes = rabbit_mnesia:cluster_nodes(running),
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName}, QNames0) ->
+ fun (Q = #amqqueue{name = QName,
+ pid = Pid,
+ slave_pids = SPids}, QNames0) ->
+ %% We don't want to pass in the whole
+ %% cluster - we don't want a situation
+ %% where starting one node causes us to
+ %% decide to start a mirror on another
+ PossibleNodes0 = [node(P) || P <- [Pid | SPids]],
+ PossibleNodes =
+ case lists:member(node(), PossibleNodes0) of
+ true -> PossibleNodes0;
+ false -> [node() | PossibleNodes0]
+ end,
{_MNode, SNodes} = suggested_queue_nodes(
- Q, ClusterNodes),
+ Q, PossibleNodes),
case lists:member(node(), SNodes) of
true -> [QName | QNames0];
false -> QNames0
end
end, [], rabbit_queue)
end),
- [ok = add_mirror(QName, node()) || QName <- QNames],
+ [{ok, _} = add_mirror(QName, node()) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
@@ -141,7 +159,7 @@ drop_mirror(QName, MirrorNode) ->
end).
add_mirrors(QName, Nodes) ->
- [ok = add_mirror(QName, Node) || Node <- Nodes],
+ [{ok, _} = add_mirror(QName, Node) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode) ->
@@ -153,11 +171,8 @@ add_mirror(QName, MirrorNode) ->
start_child(Name, MirrorNode, Q);
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
- true ->
- {error,{queue_already_mirrored_on_node,
- MirrorNode}};
- false ->
- start_child(Name, MirrorNode, Q)
+ true -> {ok, already_mirrored};
+ false -> start_child(Name, MirrorNode, Q)
end
end
end).
@@ -171,20 +186,20 @@ start_child(Name, MirrorNode, Q) ->
{ok, undefined} ->
%% this means the mirror process was
%% already running on the given node.
- ok;
+ {ok, already_mirrored};
{ok, down} ->
%% Node went down between us deciding to start a mirror
%% and actually starting it. Which is fine.
- ok;
+ {ok, node_down};
{ok, SPid} ->
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
+ {ok, started};
{error, {{stale_master_pid, StalePid}, _}} ->
rabbit_log:warning("Detected stale HA master while adding "
"mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, StalePid]),
- ok;
+ {ok, stale_master};
{error, {{duplicate_live_master, _}=Err, _}} ->
Err;
Other ->
@@ -235,14 +250,14 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, ClusterNodes) ->
+suggested_queue_nodes(Q, PossibleNodes) ->
{MNode0, SNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, ClusterNodes).
+ {MNode, SNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -250,11 +265,11 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) ->
- {MNode, All -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
+ {MNode, Possible -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- Unavailable = Nodes -- All,
+ Unavailable = Nodes -- Possible,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,16 +280,26 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
false -> promote_slave(Available)
end
end;
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) ->
+%% When we need to add nodes, we randomise our candidate list as a
+%% crude form of load-balancing. TODO it would also be nice to
+%% randomise the list of ones to remove when we have too many - but
+%% that would fail to take account of synchronisation...
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = (All -- [MNode]) -- SNodes,
+ true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
suggested_queue_nodes(_, _, {MNode, _}, _) ->
{MNode, []}.
+shuffle(L) ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
+ L1.
+
actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
{case MPid of
none -> none;
@@ -310,13 +335,38 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
All = fun ({A,B}) -> [A|B] end,
OldNodes = All(actual_queue_nodes(OldQ)),
NewNodes = All(suggested_queue_nodes(NewQ)),
- %% When a mirror dies, remove_from_queue/2 might have to add new
- %% slaves (in "exactly" mode). It will check mnesia to see which
- %% slaves there currently are. If drop_mirror/2 is invoked first
- %% then when we end up in remove_from_queue/2 it will not see the
- %% slaves that add_mirror/2 will add, and also want to add them
- %% (even though we are not responding to the death of a
- %% mirror). Breakage ensues.
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
+
+%%----------------------------------------------------------------------------
+
+validate_policy(KeyList) ->
+ validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)).
+
+validate_policy(<<"all">>, none) ->
+ ok;
+validate_policy(<<"all">>, _Params) ->
+ {error, "ha-mode=\"all\" does not take parameters", []};
+
+validate_policy(<<"nodes">>, []) ->
+ {error, "ha-mode=\"nodes\" list must be non-empty", []};
+validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) ->
+ case [I || I <- Nodes, not is_binary(I)] of
+ [] -> ok;
+ Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, "
+ "~p was not a string", [Invalid]}
+ end;
+validate_policy(<<"nodes">>, Params) ->
+ {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]};
+
+validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 ->
+ ok;
+validate_policy(<<"exactly">>, Params) ->
+ {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]};
+
+validate_policy(Mode, _Params) ->
+ {error, "~p is not a valid ha-mode value", [Mode]}.
+
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b4b0d4d34f..1ba1420f42 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,6 @@
-record(state, { q,
gm,
- master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
@@ -72,7 +71,6 @@
sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
- ack_num,
msg_id_status,
known_senders,
@@ -88,7 +86,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(#amqqueue { name = QueueName } = Q) ->
+init(Q = #amqqueue { name = QName }) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -101,23 +99,24 @@ init(#amqqueue { name = QueueName } = Q) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, Node, QueueName) end) of
- {new, MPid} ->
- erlang:monitor(process, MPid),
+ fun() -> init_it(Self, GM, Node, QName) end) of
+ {new, QPid} ->
+ erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
+ Q1 = Q #amqqueue { pid = QPid },
+ BQS = bq_init(BQ, Q1, false),
+ State = #state { q = Q1,
gm = GM,
- master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -125,7 +124,6 @@ init(#amqqueue { name = QueueName } = Q) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
- ack_num = 0,
msg_id_status = dict:new(),
known_senders = pmon:new(),
@@ -134,7 +132,7 @@ init(#amqqueue { name = QueueName } = Q) ->
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
+ ok = gm:broadcast(GM, request_depth),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@@ -143,14 +141,15 @@ init(#amqqueue { name = QueueName } = Q) ->
duplicate_live_master ->
{stop, {duplicate_live_master, Node}};
existing ->
+ gm:leave(GM),
ignore
end.
-init_it(Self, Node, QueueName) ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> add_slave(Q1, Self, MPids),
+init_it(Self, GM, Node, QName) ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
+ mnesia:read({rabbit_queue, QName}),
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
+ [] -> add_slave(Q, Self, GM),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -158,58 +157,50 @@ init_it(Self, Node, QueueName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> add_slave(Q1, Self, MPids -- [SPid]),
+ false -> Q1 = Q#amqqueue {
+ slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {_, S} <- GMPids,
+ S =/= SPid] },
+ add_slave(Q1, Self, GM),
{new, QPid}
end
end.
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
-add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves(
- Q#amqqueue{slave_pids = MPids ++ [New]}).
+add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
-handle_call({deliver, Delivery}, From, State) ->
+handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- gm = GM,
- master_pid = MPid }) ->
- %% The GM has told us about deaths, which means we're not going to
- %% receive any more messages from GM
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
+ Self = self(),
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids, ExtraNodes} ->
- rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ {ok, Pid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
- if node(Pid) =:= node(MPid) ->
+ case Pid of
+ MPid ->
%% master hasn't changed
gen_server2:reply(From, ok),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
- node(Pid) =:= node() ->
+ Self ->
%% we've become master
QueueState = promote_me(From, State),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
{become, rabbit_amqqueue_process, QueueState, hibernate};
- true ->
- %% master has changed to not us.
+ _ ->
+ %% master has changed to not us
gen_server2:reply(From, ok),
- %% assertion, we don't need to add_mirrors/2 in this
- %% branch, see last clause in remove_from_queue/2
- [] = ExtraNodes,
erlang:monitor(process, Pid),
- %% GM is lazy. So we know of the death of the
- %% slave since it is a neighbour of ours, but
- %% until a message is sent, not all members will
- %% know. That might include the new master. So
- %% broadcast a no-op message to wake everyone up.
- ok = gm:broadcast(GM, master_changed),
- noreply(State #state { master_pid = Pid })
+ noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
@@ -222,7 +213,8 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+ State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
@@ -258,8 +250,8 @@ handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
- State = #state { gm = GM, master_pid = MPid }) ->
- ok = gm:broadcast(GM, {process_death, MPid}),
+ State = #state { gm = GM, q = #amqqueue { pid = MPid } }) ->
+ ok = gm:broadcast(GM, process_death),
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
@@ -295,7 +287,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
+ Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -346,16 +338,21 @@ joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
members_changed([_SPid], _Births, []) -> ok;
members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
-handle_msg([_SPid], _From, master_changed) ->
- ok;
-handle_msg([_SPid], _From, request_length) ->
+handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
-handle_msg([SPid], _From, {process_death, Pid}) ->
- inform_deaths(SPid, [Pid]);
+handle_msg([_SPid], _From, process_death) ->
+ %% Since GM is by nature lazy we need to make sure there is some
+ %% traffic when a master dies, to make sure we get informed of the
+ %% death. That's all process_death does, create some traffic. We
+ %% must not take any notice of the master death here since it
+ %% comes without ordering guarantees - there could still be
+ %% messages from the master we have yet to receive. When we get
+ %% members_changed, then there will be no more messages.
+ ok;
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
@@ -376,7 +373,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
@@ -395,14 +392,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
- never;
-needs_confirming(#delivery { message = #basic_message {
- is_persistent = true } },
- #state { q = #amqqueue { durable = true } }) ->
- eventually;
-needs_confirming(_Delivery, _State) ->
- immediately.
+send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+ MS;
+send_or_record_confirm(published, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ id = MsgId,
+ is_persistent = true } },
+ MS, #state { q = #amqqueue { durable = true } }) ->
+ dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo },
+ MS, _State) ->
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ MS.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{CMs, MS1} =
@@ -414,16 +417,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
+ {CMsN, dict:store(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
- {ok, {confirmed, _ChPid}} ->
+ {ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
%% and then delivered and ack'd before we've
@@ -452,12 +455,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
- rabbit_mirror_queue_master:length_fun()),
+ rabbit_mirror_queue_master:depth_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
- %% TODO this has been in here since the beginning, but it's not
- %% obvious if it is needed. Investigate...
- ok = gm:confirmed_broadcast(GM, master_changed),
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
@@ -493,18 +493,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%%
%% MS contains the following three entry types:
%%
- %% a) {published, ChPid}:
+ %% a) published:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
%% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% c) {confirmed, ChPid}:
+ %% c) confirmed:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% d) discarded
+ %% d) discarded:
%% seen via gm only as discarded. Pending publication from
%% channel
%%
@@ -522,28 +522,23 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- MSList = dict:to_list(MS),
- SS = dict:from_list(
- [E || E = {_MsgId, discarded} <- MSList] ++
- [{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- MSList,
- Status =:= published orelse Status =:= confirmed]),
+ St = [published, confirmed, discarded],
+ SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
+ AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MPids),
-
- MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
- (_, MTC0) ->
- MTC0
- end, gb_trees:empty(), MSList),
- NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
- AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
+ CPid, BQ, BQS, GM, AckTags, SS, MPids),
+
+ MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_Msgid, _Status, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MS),
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags,
- Deliveries, KS, MTC).
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -637,9 +632,8 @@ confirm_sender_death(Pid) ->
ok.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { id = MsgId },
- msg_seq_no = MsgSeqNo,
- sender = ChPid },
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
@@ -649,34 +643,11 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, {confirmed, ChPid}} ->
- %% BQ has confirmed it but we didn't know what the
- %% msg_seq_no was at the time. We do now!
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ {ok, Status} ->
+ MS1 = send_or_record_confirm(
+ Status, Delivery, dict:erase(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- {ok, {published, ChPid}} ->
- %% It was published to the BQ and we didn't know the
- %% msg_seq_no so couldn't confirm it at the time.
- {MS1, SQ1} =
- case needs_confirming(Delivery, State1) of
- never -> {dict:erase(MsgId, MS),
- remove_from_pending_ch(MsgId, ChPid, SQ)};
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- {dict:store(MsgId, MMS, MS), SQ};
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- {dict:erase(MsgId, MS),
- remove_from_pending_ch(MsgId, ChPid, SQ)}
- end,
State1 #state { msg_id_status = MS1,
- sender_queues = SQ1 };
- {ok, discarded} ->
- %% We've already heard from GM that the msg is to be
- %% discarded. We won't see this again.
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
end.
@@ -694,42 +665,26 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
end.
-process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
-
- %% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. As a result, we
- %% may know that it needs confirming without knowing its
- %% msg_seq_no, which means that we can see the confirmation come
- %% back from the backing queue without knowing the msg_seq_no,
- %% which means that we're going to have to hang on to the fact
- %% that we've seen the msg_id confirmed until we can associate it
- %% with a msg_seq_no.
+publish_or_discard(Status, ChPid, MsgId,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ %% We really are going to do the publish/discard right now, even
+ %% though we may not have seen it directly from the channel. But
+ %% we cannot issues confirms until the latter has happened. So we
+ %% need to keep track of the MsgId and its confirmation status in
+ %% the meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, {published, ChPid}, MS)};
+ dict:store(MsgId, Status, MS)};
{{value, Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } }}, MQ2} ->
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never -> MS;
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS , MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- MS
- end};
+ send_or_record_confirm(Status, Delivery, MS, State1)};
{{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
@@ -738,52 +693,28 @@ process_instruction(
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
-
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
-
- {ok,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
- end};
-process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
- %% Many of the comments around the publish head above apply here
- %% too.
- State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- {MQ1, PendingCh1, MS1} =
- case queue:out(MQ) of
- {empty, _MQ} ->
- {MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, discarded, MS)};
- {{value, #delivery { message = #basic_message { id = MsgId } }},
- MQ2} ->
- %% We've already seen it from the channel, we're not
- %% going to see this again, so don't add it to MS
- {MQ2, PendingCh, MS};
- {{value, #delivery {}}, _MQ2} ->
- %% The instruction was sent to us before we were
- %% within the slave_pids within the #amqqueue{}
- %% record. We'll never receive the message directly
- %% from the channel.
- {MQ, PendingCh, MS}
- end,
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS1,
- backing_queue_state = BQS1 }};
+ State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
+
+
+process_instruction({publish, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({publish_delivered, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {ok, maybe_store_ack(true, MsgId, AckTag,
+ State1 #state { backing_queue_state = BQS1 })};
+process_instruction({discard, ChPid, MsgId}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(discarded, ChPid, MsgId, State),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -870,19 +801,16 @@ msg_ids_to_acktags(MsgIds, MA) ->
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {Acc, MAN};
- {ok, {_Num, AckTag}} -> {[AckTag | Acc],
- dict:erase(MsgId, MAN)}
+ error -> {Acc, MAN};
+ {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
-maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
- ack_num = Num }) ->
- State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
- ack_num = Num + 1 }.
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
+ State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a0536a50a9..ab9a9cebd4 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -63,6 +63,7 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([base64url/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -227,6 +228,7 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(base64url/1 :: (binary()) -> string()).
-endif.
@@ -987,3 +989,10 @@ term_to_json(L) when is_list(L) ->
term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+
+base64url(In) ->
+ lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(In))).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index aea455b4b8..b2f47428b7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -285,6 +285,9 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
end.
remove_node_offline_node(Node) ->
+ %% We want the running nodes *now*, so we don't call
+ %% `cluster_nodes(running)' which will just get what's in the cluster status
+ %% file.
case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of
{[], disc} ->
%% Note that while we check if the nodes was the last to
@@ -298,9 +301,13 @@ remove_node_offline_node(Node) ->
case cluster_nodes(running) -- [node(), Node] of
[] -> start_mnesia(),
try
+ %% What we want to do here is replace the last node to
+ %% go down with the current node. The way we do this
+ %% is by force loading the table, and making sure that
+ %% they are loaded.
rabbit_table:force_load(),
- forget_cluster_node(Node, false),
- ensure_mnesia_running()
+ rabbit_table:wait_for_replicated(),
+ forget_cluster_node(Node, false)
after
stop_mnesia()
end;
@@ -322,10 +329,17 @@ status() ->
[{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
IfNonEmpty(ram, cluster_nodes(ram)))}] ++
case mnesia:system_info(is_running) of
- yes -> [{running_nodes, cluster_nodes(running)}];
+ yes -> RunningNodes = cluster_nodes(running),
+ [{running_nodes, cluster_nodes(running)},
+ {partitions, mnesia_partitions(RunningNodes)}];
no -> []
end.
+mnesia_partitions(Nodes) ->
+ {Replies, _BadNodes} = rpc:multicall(
+ Nodes, rabbit_node_monitor, partitions, []),
+ [Reply || Reply = {_, R} <- Replies, R =/= []].
+
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
@@ -675,13 +689,12 @@ remove_node_if_mnesia_running(Node) ->
end.
leave_cluster() ->
- RunningNodes = running_nodes(nodes_excl_me(cluster_nodes(all))),
- case not is_clustered() andalso RunningNodes =:= [] of
- true -> ok;
- false -> case lists:any(fun leave_cluster/1, RunningNodes) of
- true -> ok;
- false -> e(no_running_cluster_nodes)
- end
+ case nodes_excl_me(cluster_nodes(all)) of
+ [] -> ok;
+ AllNodes -> case lists:any(fun leave_cluster/1, AllNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
end.
leave_cluster(Node) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 026aa3624e..b11c9d049a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -24,6 +24,7 @@
write_cluster_status/1, read_cluster_status/0,
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
+-export([partitions/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -32,6 +33,8 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+-record(state, {monitors, partitions}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -50,6 +53,8 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+
-endif.
%%----------------------------------------------------------------------------
@@ -168,10 +173,23 @@ notify_left_cluster(Node) ->
ok.
%%----------------------------------------------------------------------------
+%% Server calls
+%%----------------------------------------------------------------------------
+
+partitions() ->
+ gen_server:call(?SERVER, partitions, infinity).
+
+%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) -> {ok, pmon:new()}.
+init([]) ->
+ {ok, _} = mnesia:subscribe(system),
+ {ok, #state{monitors = pmon:new(),
+ partitions = []}}.
+
+handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
+ {reply, {node(), Partitions}, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -179,9 +197,10 @@ handle_call(_Request, _From, State) ->
%% Note: when updating the status file, we can't simply write the
%% mnesia information since the message can (and will) overtake the
%% mnesia propagation.
-handle_cast({node_up, Node, NodeType}, Monitors) ->
+handle_cast({node_up, Node, NodeType},
+ State = #state{monitors = Monitors}) ->
case pmon:is_monitored({rabbit, Node}, Monitors) of
- true -> {noreply, Monitors};
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
@@ -191,7 +210,8 @@ handle_cast({node_up, Node, NodeType}, Monitors) ->
end,
add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
- {noreply, pmon:monitor({rabbit, Node}, Monitors)}
+ {noreply, State#state{
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}}
end;
handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
@@ -210,12 +230,21 @@ handle_cast({left_cluster, Node}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
+ State = #state{monitors = Monitors}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, pmon:erase({rabbit, Node}, Monitors)};
+ {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
+
+handle_info({mnesia_system_event,
+ {inconsistent_database, running_partitioned_network, Node}},
+ State = #state{partitions = Partitions}) ->
+ Partitions1 = ordsets:to_list(
+ ordsets:add_element(Node, ordsets:from_list(Partitions))),
+ {noreply, State#state{partitions = Partitions1}};
+
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 69480c9c11..2717cc9217 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -22,11 +22,13 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2, pget/3]).
+-import(rabbit_misc, [pget/2]).
-export([register/0]).
-export([name/1, get/2, set/1]).
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
+ list_formatted/1, info_keys/0]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -41,7 +43,7 @@ name(#amqqueue{policy = Policy}) -> name0(Policy);
name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
-name0(Policy) -> pget(<<"name">>, Policy).
+name0(Policy) -> pget(name, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
@@ -55,7 +57,7 @@ get(Name, EntityName = #resource{virtual_host = VHost}) ->
get0(Name, match(EntityName, list(VHost))).
get0(_Name, undefined) -> {error, not_found};
-get0(Name, List) -> case pget(<<"policy">>, List) of
+get0(Name, List) -> case pget(definition, List) of
undefined -> {error, not_found};
Policy -> case pget(Name, Policy) of
undefined -> {error, not_found};
@@ -65,6 +67,81 @@ get0(Name, List) -> case pget(<<"policy">>, List) of
%%----------------------------------------------------------------------------
+parse_set(VHost, Name, Pattern, Definition, undefined) ->
+ parse_set0(VHost, Name, Pattern, Definition, 0);
+parse_set(VHost, Name, Pattern, Definition, Priority) ->
+ try list_to_integer(Priority) of
+ Num -> parse_set0(VHost, Name, Pattern, Definition, Num)
+ catch
+ error:badarg -> {error, "~p priority must be a number", [Priority]}
+ end.
+
+parse_set0(VHost, Name, Pattern, Defn, Priority) ->
+ case rabbit_misc:json_decode(Defn) of
+ {ok, JSON} ->
+ set0(VHost, Name,
+ [{<<"pattern">>, list_to_binary(Pattern)},
+ {<<"definition">>, rabbit_misc:json_to_term(JSON)},
+ {<<"priority">>, Priority}]);
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(VHost, Name, Pattern, Definition, Priority) ->
+ PolicyProps = [{<<"pattern">>, Pattern},
+ {<<"definition">>, Definition},
+ {<<"priority">>, case Priority of
+ undefined -> 0;
+ _ -> Priority
+ end}],
+ set0(VHost, Name, PolicyProps).
+
+set0(VHost, Name, Term) ->
+ rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term).
+
+delete(VHost, Name) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
+
+lookup(VHost, Name) ->
+ case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
+ not_found -> not_found;
+ P -> p(P, fun ident/1)
+ end.
+
+list() ->
+ list('_').
+
+list(VHost) ->
+ list0(VHost, fun ident/1).
+
+list_formatted(VHost) ->
+ order_policies(list0(VHost, fun format/1)).
+
+list0(VHost, DefnFun) ->
+ [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
+
+order_policies(PropList) ->
+ lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end,
+ PropList).
+
+p(Parameter, DefnFun) ->
+ Value = pget(value, Parameter),
+ [{vhost, pget(vhost, Parameter)},
+ {name, pget(name, Parameter)},
+ {pattern, pget(<<"pattern">>, Value)},
+ {definition, DefnFun(pget(<<"definition">>, Value))},
+ {priority, pget(<<"priority">>, Value)}].
+
+format(Term) ->
+ {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
+ list_to_binary(JSON).
+
+ident(X) -> X.
+
+info_keys() -> [vhost, name, pattern, definition, priority].
+
+%%----------------------------------------------------------------------------
+
validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
@@ -80,10 +157,6 @@ notify_clear(VHost, <<"policy">>, _Name) ->
%%----------------------------------------------------------------------------
-list(VHost) ->
- [[{<<"name">>, pget(key, P)} | pget(value, P)]
- || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-
update_policies(VHost) ->
Policies = list(VHost),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
@@ -98,19 +171,17 @@ update_policies(VHost) ->
ok.
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
- NewPolicy = match(XName, Policies),
- case NewPolicy of
+ case match(XName, Policies) of
OldPolicy -> no_change;
- _ -> rabbit_exchange:update(
+ NewPolicy -> rabbit_exchange:update(
XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
{X, X#exchange{policy = NewPolicy}}
end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
- NewPolicy = match(QName, Policies),
- case NewPolicy of
+ case match(QName, Policies) of
OldPolicy -> no_change;
- _ -> rabbit_amqqueue:update(
+ NewPolicy -> rabbit_amqqueue:update(
QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
{Q, Q#amqqueue{policy = NewPolicy}}
end.
@@ -129,14 +200,52 @@ match(Name, Policies) ->
end.
matches(#resource{name = Name}, Policy) ->
- match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]).
+ match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]).
-sort_pred(A, B) ->
- pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
+sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
%%----------------------------------------------------------------------------
policy_validation() ->
- [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional},
- {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
- {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"definition">>, fun validation/2, mandatory}].
+
+validation(_Name, []) ->
+ {error, "no policy provided", []};
+validation(_Name, Terms) when is_list(Terms) ->
+ {Keys, Modules} = lists:unzip(
+ rabbit_registry:lookup_all(policy_validator)),
+ [] = dups(Keys), %% ASSERTION
+ Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
+ {TermKeys, _} = lists:unzip(Terms),
+ case dups(TermKeys) of
+ [] -> validation0(Validators, Terms);
+ Dup -> {error, "~p duplicate keys not allowed", [Dup]}
+ end;
+validation(_Name, Term) ->
+ {error, "parse error while reading policy: ~p", [Term]}.
+
+validation0(Validators, Terms) ->
+ case lists:foldl(
+ fun (Mod, {ok, TermsLeft}) ->
+ ModKeys = proplists:get_all_values(Mod, Validators),
+ case [T || {Key, _} = T <- TermsLeft,
+ lists:member(Key, ModKeys)] of
+ [] -> {ok, TermsLeft};
+ Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope}
+ end;
+ (_, Acc) ->
+ Acc
+ end, {ok, Terms}, proplists:get_keys(Validators)) of
+ {ok, []} ->
+ ok;
+ {ok, Unvalidated} ->
+ {error, "~p are not recognised policy settings", [Unvalidated]};
+ {Error, _} ->
+ Error
+ end.
+
+a2b(A) -> list_to_binary(atom_to_list(A)).
+
+dups(L) -> L -- lists:usort(L).
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
new file mode 100644
index 0000000000..b59dec2b47
--- /dev/null
+++ b/src/rabbit_policy_validator.erl
@@ -0,0 +1,37 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy_validator).
+
+-ifdef(use_specs).
+
+-type(validate_results() ::
+ 'ok' | {error, string(), [term()]} | [validate_results()]).
+
+-callback validate_policy([{binary(), term()}]) -> validate_results().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {validate_policy, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6d6c648acb..21f581548d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -537,7 +537,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
+ gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index e14bbba018..32709d2484 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -107,7 +107,8 @@ sanity_check_module(ClassModule, Module) ->
class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator.
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index b58b459a7f..4a83e61ff9 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
--export([parse_set/4, set/4, clear/3,
- list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
- lookup/3, value/3, value/4, info_keys/0]).
+-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1,
+ list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3,
+ value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -32,8 +32,12 @@
-> ok_or_error_string()).
-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
+ -> ok_or_error_string()).
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
+-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
@@ -57,29 +61,37 @@
%%---------------------------------------------------------------------------
-parse_set(VHost, Component, Key, String) ->
+parse_set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
+parse_set(VHost, Component, Name, String) ->
case rabbit_misc:json_decode(String) of
- {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON));
+ {ok, JSON} -> set(VHost, Component, Name,
+ rabbit_misc:json_to_term(JSON));
error -> {error_string, "JSON decoding error"}
end.
-set(VHost, Component, Key, Term) ->
- case set0(VHost, Component, Key, Term) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
+set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
+set(VHost, Component, Name, Term) ->
+ set_any(VHost, Component, Name, Term).
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set0(VHost, Component, Key, Term) ->
+set_any(VHost, Component, Name, Term) ->
+ case set_any0(VHost, Component, Name, Term) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+set_any0(VHost, Component, Name, Term) ->
case lookup_component(Component) of
{ok, Mod} ->
- case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of
+ case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of
ok ->
- case mnesia_update(VHost, Component, Key, Term) of
+ case mnesia_update(VHost, Component, Name, Term) of
{old, Term} -> ok;
- _ -> Mod:notify(VHost, Component, Key, Term)
+ _ -> Mod:notify(VHost, Component, Name, Term)
end,
ok;
E ->
@@ -89,43 +101,49 @@ set0(VHost, Component, Key, Term) ->
E
end.
-mnesia_update(VHost, Component, Key, Term) ->
+mnesia_update(VHost, Component, Name, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
+ Res = case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write),
+ ok = mnesia:write(?TABLE, c(VHost, Component, Name, Term), write),
Res
end).
-clear(VHost, Component, Key) ->
- case clear0(VHost, Component, Key) of
+clear(_, <<"policy">> , _) ->
+ {error_string, "policies may not be cleared using this method"};
+clear(VHost, Component, Name) ->
+ clear_any(VHost, Component, Name).
+
+clear_any(VHost, Component, Name) ->
+ case clear_any0(VHost, Component, Name) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-clear0(VHost, Component, Key) ->
+clear_any0(VHost, Component, Name) ->
case lookup_component(Component) of
{ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Key)) of
- ok -> mnesia_clear(VHost, Component, Key),
- Mod:notify_clear(VHost, Component, Key),
+ Mod:validate_clear(VHost, Component, Name)) of
+ ok -> mnesia_clear(VHost, Component, Name),
+ Mod:notify_clear(VHost, Component, Name),
ok;
E -> E
end;
E -> E
end.
-mnesia_clear(VHost, Component, Key) ->
+mnesia_clear(VHost, Component, Name) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
- ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write)
+ ok = mnesia:delete(?TABLE, {VHost, Component, Name}, write)
end).
list() ->
- [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
+ [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
+ rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>].
list(VHost) -> list(VHost, '_', []).
list_strict(Component) -> list('_', Component, not_found).
@@ -136,60 +154,63 @@ list(VHost, Component, Default) ->
case component_good(Component) of
true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
_ = '_'},
- [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
+ mnesia:dirty_match_object(?TABLE, Match),
+ Comp =/= <<"policy">> orelse
+ Component =:= <<"policy">>];
_ -> Default
end.
list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
-lookup(VHost, Component, Key) ->
- case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
+lookup(VHost, Component, Name) ->
+ case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(VHost, Component, Key) ->
- case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
+value(VHost, Component, Name) ->
+ case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(VHost, Component, Key, Default) ->
- Params = lookup0(VHost, Component, Key,
+value(VHost, Component, Name, Default) ->
+ Params = lookup0(VHost, Component, Name,
fun () ->
- lookup_missing(VHost, Component, Key, Default)
+ lookup_missing(VHost, Component, Name, Default)
end),
Params#runtime_parameters.value.
-lookup0(VHost, Component, Key, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of
+lookup0(VHost, Component, Name, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(VHost, Component, Key, Default) ->
+lookup_missing(VHost, Component, Name, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
- [] -> Record = c(VHost, Component, Key, Default),
+ case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
+ [] -> Record = c(VHost, Component, Name, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(VHost, Component, Key, Default) ->
- #runtime_parameters{key = {VHost, Component, Key},
+c(VHost, Component, Name, Default) ->
+ #runtime_parameters{key = {VHost, Component, Name},
value = Default}.
-p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) ->
+p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
[{vhost, VHost},
{component, Component},
- {key, Key},
+ {name, Name},
{value, Value}].
-info_keys() -> [component, key, value].
+info_keys() -> [component, name, value].
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index 5224ccaa36..d4d7271e90 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -16,9 +16,14 @@
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
+-behaviour(rabbit_policy_validator).
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
+-export([validate_policy/1]).
+-export([register_policy_validator/0, unregister_policy_validator/0]).
+
+%----------------------------------------------------------------------------
register() ->
rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE).
@@ -36,3 +41,28 @@ validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
+
+%----------------------------------------------------------------------------
+
+register_policy_validator() ->
+ rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE),
+ rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE).
+
+unregister_policy_validator() ->
+ rabbit_registry:unregister(policy_validator, <<"testeven">>),
+ rabbit_registry:unregister(policy_validator, <<"testpos">>).
+
+validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) ->
+ case length(Terms) rem 2 =:= 0 of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) ->
+ case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy(_) ->
+ {error, "meh", []}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 11f280bb7f..962bb64889 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -57,6 +57,7 @@ all_tests() ->
passed = test_dynamic_mirroring(),
passed = test_user_management(),
passed = test_runtime_parameters(),
+ passed = test_policy_validation(),
passed = test_server_status(),
passed = test_confirms(),
passed =
@@ -774,7 +775,9 @@ test_log_management_during_startup() ->
ok = case catch control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotation_tty_no_handlers_test});
- {error, {cannot_log_to_tty, _, _}} -> ok
+ {badrpc, {'EXIT', {rabbit,failure_during_boot,
+ {error,{cannot_log_to_tty,
+ _, not_installed}}}}} -> ok
end,
%% fix sasl logging
@@ -798,7 +801,9 @@ test_log_management_during_startup() ->
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotation_no_write_permission_dir_test});
- {error, {cannot_log_to_file, _, _}} -> ok
+ {badrpc, {'EXIT',
+ {rabbit, failure_during_boot,
+ {error, {cannot_log_to_file, _, _}}}}} -> ok
end,
%% start application with logging to a subdirectory which
@@ -809,8 +814,11 @@ test_log_management_during_startup() ->
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotatation_parent_dirs_test});
- {error, {cannot_log_to_file, _,
- {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
+ {badrpc,
+ {'EXIT', {rabbit,failure_during_boot,
+ {error, {cannot_log_to_file, _,
+ {error,
+ {cannot_create_parent_dirs, _, eacces}}}}}}} -> ok
end,
ok = set_permissions(TmpDir, 8#00700),
ok = set_permissions(TmpLog, 8#00600),
@@ -886,38 +894,49 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
Policy, Params, {OldM, OldSs}, All),
- NewSs = lists:sort(NewSs0)
+ NewSs1 = lists:sort(NewSs0),
+ case dm_list_match(NewSs, NewSs1, ExtraSs) of
+ ok -> ok;
+ error -> exit({no_match, NewSs, NewSs1, ExtraSs})
+ end
end,
- Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
%% Add a node
- Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
%% Promote slave to master by policy
- Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b]}, <<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[]}, <<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
passed.
+%% Does the first list match the second where the second is required
+%% to have exactly Extra superfluous items?
+dm_list_match([], [], 0) -> ok;
+dm_list_match(_, [], _Extra) -> error;
+dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra);
+dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1).
+
test_user_management() ->
%% lots if stuff that should fail
@@ -1028,6 +1047,26 @@ test_runtime_parameters() ->
rabbit_runtime_parameters_test:unregister(),
passed.
+test_policy_validation() ->
+ rabbit_runtime_parameters_test:register_policy_validator(),
+ SetPol =
+ fun (Key, Val) ->
+ control_action(
+ set_policy,
+ ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
+ end,
+
+ ok = SetPol("testeven", []),
+ ok = SetPol("testeven", [1, 2]),
+ ok = SetPol("testeven", [1, 2, 3, 4]),
+ ok = SetPol("testpos", [2, 5, 5678]),
+
+ {error_string, _} = SetPol("testpos", [-1, 0, 1]),
+ {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+
+ rabbit_runtime_parameters_test:unregister_policy_validator(),
+ passed.
+
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
@@ -1089,8 +1128,8 @@ test_server_status() ->
ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
%% eval
- {parse_error, _} = control_action(eval, ["\""]),
- {parse_error, _} = control_action(eval, ["a("]),
+ {error_string, _} = control_action(eval, ["\""]),
+ {error_string, _} = control_action(eval, ["a("]),
ok = control_action(eval, ["a."]),
%% cleanup
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index ddc9c565b1..21fdcd667b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -42,6 +42,7 @@
[exchange_scratches, ha_mirrors]}).
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
+-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
%% -------------------------------------------------------------------
@@ -66,6 +67,7 @@
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
+-spec(gm_pids/0 :: () -> 'ok').
-endif.
@@ -268,6 +270,19 @@ no_mirror_nodes() ->
|| T <- Tables],
ok.
+gm_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddGMPidsFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []}
+ end,
+ [ok = transform(T, AddGMPidsFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy, gm_pids])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddb136a73d..8a3fd9d917 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,11 +17,11 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/5, drain_confirmed/1,
+ publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
ram_msg_count = RamMsgCount + 1,
unconfirmed = UC1 })).
-publish_delivered(false, #basic_message { id = MsgId },
- #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { async_callback = Callback,
- len = 0 }) ->
- case NeedsConfirming of
- true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
- false -> ok
- end,
- {undefined, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
+publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, State = #vqstate { len = 0,
@@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+discard(_MsgId, _ChPid, State) -> State.
+
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
true -> {[], State}; %% common case
@@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
is_duplicate(_Msg, State) -> {false, State}.
-discard(_Msg, _ChPid, State) -> State.
-
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -1325,12 +1316,9 @@ must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-blind_confirm(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
- blind_confirm(Callback, MsgIdSet);
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
Callback(?MODULE,
fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 03dfbe245d..297fa56fe3 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -97,6 +97,8 @@ internal_delete(VHostPath) ->
proplists:get_value(component, Info),
proplists:get_value(key, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ || Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
new file mode 100644
index 0000000000..53f3df18b3
--- /dev/null
+++ b/src/rabbit_vm.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_vm).
+
+-export([memory/0]).
+
+-define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
+ "rfc4627_jsonrpc"]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(memory/0 :: () -> rabbit_types:infos()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Like erlang:memory(), but with awareness of rabbit-y things
+memory() ->
+ Conns = (sup_memory(rabbit_tcp_client_sup) +
+ sup_memory(ssl_connection_sup) +
+ sup_memory(amqp_sup)),
+ Qs = (sup_memory(rabbit_amqqueue_sup) +
+ sup_memory(rabbit_mirror_queue_slave_sup)),
+ Mnesia = mnesia_memory(),
+ MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
+ MsgIndexProc = (pid_memory(msg_store_transient) +
+ pid_memory(msg_store_persistent)),
+ MgmtDbETS = ets_memory(rabbit_mgmt_db),
+ MgmtDbProc = sup_memory(rabbit_mgmt_sup),
+ Plugins = plugin_memory() - MgmtDbProc,
+
+ [{total, Total},
+ {processes, Processes},
+ {ets, ETS},
+ {atom, Atom},
+ {binary, Bin},
+ {code, Code},
+ {system, System}] =
+ erlang:memory([total, processes, ets, atom, binary, code, system]),
+
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins,
+
+ [{total, Total},
+ {connection_procs, Conns},
+ {queue_procs, Qs},
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
+ {mnesia, Mnesia},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+ {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+ {binary, Bin},
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Atom - Bin - Code}].
+
+%% [1] - erlang:memory(processes) can be less than the sum of its
+%% parts. Rather than display something nonsensical, just silence any
+%% claims about negative memory. See
+%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+
+%%----------------------------------------------------------------------------
+
+sup_memory(Sup) ->
+ lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) +
+ pid_memory(Sup).
+
+sup_children(Sup) ->
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+
+pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
+ {memory, M} -> M;
+ _ -> 0
+ end;
+pid_memory(Name) when is_atom(Name) -> case whereis(Name) of
+ P when is_pid(P) -> pid_memory(P);
+ _ -> 0
+ end.
+
+child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid);
+child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid);
+child_memory(_, _) -> 0.
+
+mnesia_memory() ->
+ case mnesia:system_info(is_running) of
+ yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
+ Tab <- mnesia:system_info(tables)]);
+ no -> 0
+ end.
+
+ets_memory(Name) ->
+ lists:sum([bytes(ets:info(T, memory)) || T <- ets:all(),
+ N <- [ets:info(T, name)],
+ N =:= Name]).
+
+bytes(Words) -> Words * erlang:system_info(wordsize).
+
+plugin_memory() ->
+ lists:sum([plugin_memory(App) ||
+ {App, _, _} <- application:which_applications(),
+ is_plugin(atom_to_list(App))]).
+
+plugin_memory(App) ->
+ case catch application_master:get_child(
+ application_controller:get_master(App)) of
+ {Pid, _} -> sup_memory(Pid);
+ _ -> 0
+ end.
+
+is_plugin("rabbitmq_" ++ _) -> true;
+is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).