diff options
| author | Tim Watson <tim@rabbitmq.com> | 2012-10-23 14:23:16 +0100 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2012-10-23 14:23:16 +0100 |
| commit | 6c7c60ebde56a3cd4f16e0da39918fdefe63bc39 (patch) | |
| tree | 277f1f89ab142cf66942082b31cfd77c82afb7b6 /src | |
| parent | b22b967f9edf101d0beeaf555834f7f6857d8def (diff) | |
| parent | d6e17b83681d56cff15559ddbe4df96476077ead (diff) | |
| download | rabbitmq-server-git-6c7c60ebde56a3cd4f16e0da39918fdefe63bc39.tar.gz | |
merge default
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 113 | ||||
| -rw-r--r-- | src/gm_soak_test.erl | 4 | ||||
| -rw-r--r-- | src/gm_speed_test.erl | 3 | ||||
| -rw-r--r-- | src/gm_tests.erl | 10 | ||||
| -rw-r--r-- | src/rabbit.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 59 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_guid.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 214 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 134 | ||||
| -rw-r--r-- | src/rabbit_policy_validator.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_registry.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters.erl | 109 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters_test.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 2 |
26 files changed, 720 insertions, 391 deletions
diff --git a/src/gm.erl b/src/gm.erl index 90433e84c5..4a95de0dd1 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -376,7 +376,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/3, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -408,7 +408,8 @@ callback_args, confirms, broadcast_buffer, - broadcast_timer + broadcast_timer, + txn_executor }). -record(gm_group, { name, version, members }). @@ -428,9 +429,10 @@ -export_type([group_name/0]). -type(group_name() :: any()). +-type(txn_fun() :: fun((fun(() -> any())) -> any())). -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}). --spec(start_link/3 :: (group_name(), atom(), any()) -> +-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) -> rabbit_types:ok_pid_or_error()). -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). @@ -507,8 +509,8 @@ table_definitions() -> {Name, Attributes} = ?TABLE, [{Name, [?TABLE_MATCH | Attributes]}]. -start_link(GroupName, Module, Args) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args], []). +start_link(GroupName, Module, Args, TxnFun) -> + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). leave(Server) -> gen_server2:cast(Server, leave). @@ -529,7 +531,7 @@ forget_group(GroupName) -> end), ok. -init([GroupName, Module, Args]) -> +init([GroupName, Module, Args, TxnFun]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), @@ -545,7 +547,8 @@ init([GroupName, Module, Args]) -> callback_args = Args, confirms = queue:new(), broadcast_buffer = [], - broadcast_timer = undefined }, hibernate, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From, view = View, members_state = MembersState, module = Module, - callback_args = Args }) -> + callback_args = Args, + txn_executor = TxnFun }) -> {MembersState1, Group} = record_new_member_in_group( GroupName, Self, NewMember, @@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From, {catchup, Self, prepare_members_state(MembersState1)}), MembersState1 - end), + end, TxnFun), View2 = group_to_view(Group), State1 = check_neighbours(State #state { view = View2, members_state = MembersState1 }), @@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self, group_name = GroupName, members_state = undefined, module = Module, - callback_args = Args }) -> - View = join_group(Self, GroupName), + callback_args = Args, + txn_executor = TxnFun }) -> + View = join_group(Self, GroupName, TxnFun), MembersState = case alive_view_members(View) of [Self] -> blank_member_state(); @@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, view = View, module = Module, callback_args = Args, - confirms = Confirms }) -> + confirms = Confirms, + txn_executor = TxnFun }) -> Member = case {Left, Right} of {{Member1, MRef}, _} -> Member1; {_, {Member1, MRef}} -> Member1; @@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, noreply(State); _ -> View1 = - group_to_view(record_dead_member_in_group(Member, GroupName)), + group_to_view(record_dead_member_in_group(Member, + GroupName, TxnFun)), {Result, State2} = case alive_view_members(View1) of [Self] -> @@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) -> %% View modification %% --------------------------------------------------------------------------- -join_group(Self, GroupName) -> - join_group(Self, GroupName, read_group(GroupName)). +join_group(Self, GroupName, TxnFun) -> + join_group(Self, GroupName, read_group(GroupName), TxnFun). -join_group(Self, GroupName, {error, not_found}) -> - join_group(Self, GroupName, prune_or_create_group(Self, GroupName)); -join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) -> +join_group(Self, GroupName, {error, not_found}, TxnFun) -> + join_group(Self, GroupName, + prune_or_create_group(Self, GroupName, TxnFun), TxnFun); +join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> group_to_view(Group); -join_group(Self, GroupName, #gm_group { members = Members } = Group) -> +join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:member(Self, Members) of true -> group_to_view(Group); @@ -1000,20 +1008,22 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> case lists:filter(fun is_member_alive/1, Members) of [] -> join_group(Self, GroupName, - prune_or_create_group(Self, GroupName)); + prune_or_create_group(Self, GroupName, TxnFun)); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), Handler = fun () -> join_group( Self, GroupName, - record_dead_member_in_group(Left, GroupName)) + record_dead_member_in_group( + Left, GroupName, TxnFun), + TxnFun) end, try case gen_server2:call( get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); - not_ready -> join_group(Self, GroupName) + not_ready -> join_group(Self, GroupName, TxnFun) end catch exit:{R, _} @@ -1032,29 +1042,29 @@ read_group(GroupName) -> [Group] -> Group end. -prune_or_create_group(Self, GroupName) -> - {atomic, Group} = - mnesia:sync_transaction( - fun () -> GroupNew = #gm_group { name = GroupName, - members = [Self], - version = ?VERSION_START }, - case mnesia:read({?GROUP_TABLE, GroupName}) of - [] -> - mnesia:write(GroupNew), - GroupNew; - [Group1 = #gm_group { members = Members }] -> - case lists:any(fun is_member_alive/1, Members) of - true -> Group1; - false -> mnesia:write(GroupNew), - GroupNew - end - end - end), +prune_or_create_group(Self, GroupName, TxnFun) -> + Group = TxnFun( + fun () -> + GroupNew = #gm_group { name = GroupName, + members = [Self], + version = ?VERSION_START }, + case mnesia:read({?GROUP_TABLE, GroupName}) of + [] -> + mnesia:write(GroupNew), + GroupNew; + [Group1 = #gm_group { members = Members }] -> + case lists:any(fun is_member_alive/1, Members) of + true -> Group1; + false -> mnesia:write(GroupNew), + GroupNew + end + end + end), Group. -record_dead_member_in_group(Member, GroupName) -> - {atomic, Group} = - mnesia:sync_transaction( +record_dead_member_in_group(Member, GroupName, TxnFun) -> + Group = + TxnFun( fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = mnesia:read({?GROUP_TABLE, GroupName}), case lists:splitwith( @@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) -> end), Group. -record_new_member_in_group(GroupName, Left, NewMember, Fun) -> - {atomic, {Result, Group}} = - mnesia:sync_transaction( +record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) -> + {Result, Group} = + TxnFun( fun () -> [#gm_group { members = Members, version = Ver } = Group1] = mnesia:read({?GROUP_TABLE, GroupName}), @@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) -> end), {Result, Group}. -erase_members_in_group(Members, GroupName) -> +erase_members_in_group(Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], - {atomic, Group} = - mnesia:sync_transaction( + Group = + TxnFun( fun () -> [Group1 = #gm_group { members = [_|_] = Members1, version = Ver }] = @@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self, view = View0, members_state = MembersState, module = Module, - callback_args = Args }, View) -> + callback_args = Args, + txn_executor = TxnFun }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} = ?SETS:fold( @@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self, case Erasable of [] -> {ok, State1 #state { view = View }}; _ -> View1 = group_to_view( - erase_members_in_group(Erasable, GroupName)), + erase_members_in_group(Erasable, GroupName, TxnFun)), {callback_view_changed(Args, Module, View0, View1), check_neighbours(State1 #state { view = View1 })} end. diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 572175410d..5fbfc22371 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -105,7 +105,9 @@ spawn_member() -> random:seed(MegaSecs, Secs, MicroSecs), %% start up delay of no more than 10 seconds timer:sleep(random:uniform(10000)), - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []), + {ok, Pid} = gm:start_link( + ?MODULE, ?MODULE, [], + fun rabbit_misc:execute_mnesia_transaction/1), Start = random:uniform(10000), send_loop(Pid, Start, Start + random:uniform(10000)), gm:leave(Pid), diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index dad75bd447..84d4ab2fb1 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -44,7 +44,8 @@ terminate(Owner, _Reason) -> %% other wile_e_coyote(Time, WriteUnit) -> - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), receive joined -> ok end, timer:sleep(1000), %% wait for all to join timer:send_after(Time, stop), diff --git a/src/gm_tests.erl b/src/gm_tests.erl index 0a2d420469..a9c0ba9035 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -76,7 +76,9 @@ test_confirmed_broadcast() -> test_member_death() -> with_two_members( fun (Pid, Pid2) -> - {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid3} = gm:start_link( + ?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), passed = receive_joined(Pid3, [Pid, Pid2, Pid3], timeout_joining_gm_group_3), passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1), @@ -128,10 +130,12 @@ test_broadcast_fun(Fun) -> with_two_members(Fun) -> ok = gm:create_tables(), - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1), - {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2), passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2), diff --git a/src/rabbit.erl b/src/rabbit.erl index 93808f8413..69f77824f6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -300,9 +300,9 @@ start() -> %% We do not want to HiPE compile or upgrade %% mnesia after just restarting the app ok = ensure_application_loaded(), - ok = rabbit_node_monitor:prepare_cluster_status_files(), - ok = rabbit_mnesia:check_cluster_consistency(), ok = ensure_working_log_handlers(), + rabbit_node_monitor:prepare_cluster_status_files(), + rabbit_mnesia:check_cluster_consistency(), ok = app_utils:start_applications( app_startup_order(), fun handle_app_error/2), ok = print_plugin_info(rabbit_plugins:active()) @@ -312,13 +312,13 @@ boot() -> start_it(fun() -> ok = ensure_application_loaded(), maybe_hipe_compile(), - ok = rabbit_node_monitor:prepare_cluster_status_files(), ok = ensure_working_log_handlers(), + rabbit_node_monitor:prepare_cluster_status_files(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), %% It's important that the consistency check happens after %% the upgrade, since if we are a secondary node the %% primary node will have forgotten us - ok = rabbit_mnesia:check_cluster_consistency(), + rabbit_mnesia:check_cluster_consistency(), Plugins = rabbit_plugins:setup(), ToBeLoaded = Plugins ++ ?APPS, ok = app_utils:load_applications(ToBeLoaded), @@ -330,21 +330,26 @@ boot() -> end). handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - boot_error({could_not_start, App, Reason}, not_available); + throw({could_not_start, App, Reason}); handle_app_error(App, Reason) -> - boot_error({could_not_start, App, Reason}, not_available). + throw({could_not_start, App, Reason}). start_it(StartFun) -> try StartFun() + catch + throw:{could_not_start, _App, _Reason}=Err -> + boot_error(Err, not_available); + _:Reason -> + boot_error(Reason, erlang:get_stacktrace()) after %% give the error loggers some time to catch up timer:sleep(100) end. stop() -> - rabbit_log:info("Stopping Rabbit~n"), + rabbit_log:info("Stopping RabbitMQ~n"), ok = app_utils:stop_applications(app_shutdown_order()). stop_and_halt() -> @@ -412,6 +417,9 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> + {ok, Vsn} = application:get_key(rabbit, vsn), + error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n", + [Vsn, erlang:system_info(otp_release)]), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), print_banner(), @@ -498,13 +506,16 @@ sort_boot_steps(UnsortedSteps) -> not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; MissingFunctions -> basic_boot_error( + {missing_functions, MissingFunctions}, "Boot step functions not exported: ~p~n", [MissingFunctions]) end; {error, {vertex, duplicate, StepName}} -> - basic_boot_error("Duplicate boot step name: ~w~n", [StepName]); + basic_boot_error({duplicate_boot_step, StepName}, + "Duplicate boot step name: ~w~n", [StepName]); {error, {edge, Reason, From, To}} -> basic_boot_error( + {invalid_boot_step_dependency, From, To}, "Could not add boot step dependency of ~w on ~w:~n~s", [To, From, case Reason of @@ -518,7 +529,7 @@ sort_boot_steps(UnsortedSteps) -> end]) end. -boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> +boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> AllNodes = rabbit_mnesia:cluster_nodes(all), {Err, Nodes} = case AllNodes -- [node()] of @@ -529,23 +540,27 @@ boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> "Timeout contacting cluster nodes: ~p.~n", [Ns]), Ns} end, - basic_boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); - + basic_boot_error(Term, + Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); boot_error(Reason, Stacktrace) -> - Fmt = "Error description:~n ~p~n~n" + Fmt = "Error description:~n ~p~n~n" ++ "Log files (may contain more information):~n ~s~n ~s~n~n", Args = [Reason, log_location(kernel), log_location(sasl)], + boot_error(Reason, Fmt, Args, Stacktrace). + +boot_error(Reason, Fmt, Args, Stacktrace) -> case Stacktrace of - not_available -> basic_boot_error(Fmt, Args); - _ -> basic_boot_error(Fmt ++ "Stack trace:~n ~p~n~n", + not_available -> basic_boot_error(Reason, Fmt, Args); + _ -> basic_boot_error(Reason, Fmt ++ + "Stack trace:~n ~p~n~n", Args ++ [Stacktrace]) end. -basic_boot_error(Format, Args) -> +basic_boot_error(Reason, Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), - error_logger:error_msg(Format, Args), + rabbit_misc:local_info_msg(Format, Args), timer:sleep(1000), - exit({?MODULE, failure_during_boot}). + exit({?MODULE, failure_during_boot, Reason}). %%--------------------------------------------------------------------------- %% boot step functions diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a8b0ea2438..6ad85b24f5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], - sync_slave_pids = []}), + sync_slave_pids = [], + gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f1821b5a39..68f95778b5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -86,6 +86,7 @@ -define(STATISTICS_KEYS, [pid, + policy, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, @@ -512,6 +513,14 @@ send_or_record_confirm(#delivery{sender = SenderPid, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}}, + State) -> + %% fake an 'eventual' confirm from BQ; noop if not needed + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + confirm_messages([MsgId], State), + BQS1 = BQ:discard(MsgId, SenderPid, BQS), + State1#q{backing_queue_state = BQS1}. + run_message_queue(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), @@ -520,17 +529,20 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, + Props = #message_properties{delivered = Delivered}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + fun (true, State1 = #q{backing_queue_state = BQS2}) -> {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, Props#message_properties.delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}} + Message, Props, SenderPid, BQS2), + {{Message, Delivered, AckTag}, + true, State1#q{backing_queue_state = BQS3}}; + (false, State1) -> + {{Message, Delivered, undefined}, + true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); {published, BQS1} -> {true, State#q{backing_queue_state = BQS1}}; @@ -545,13 +557,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, case attempt_delivery(Delivery, Props, State1) of {true, State2} -> State2; - %% the next one is an optimisations - %% TODO: optimise the Confirm =/= never case too - {false, State2 = #q{ttl = 0, dlx = undefined, - backing_queue = BQ, backing_queue_state = BQS}} - when Confirm == never -> - BQS1 = BQ:discard(Message, SenderPid, BQS), - State2#q{backing_queue_state = BQS1}; + %% The next one is an optimisation + {false, State2 = #q{ttl = 0, dlx = undefined}} -> + discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, @@ -687,16 +695,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), DLXFun = dead_letter_fun(expired, State), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = - case DLXFun of - undefined -> - {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), - {Next, BQS2}; - _ -> - {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - DLXFun(Msgs), - {Next, BQS2} - end, + {Props, BQS1} = case DLXFun of + undefined -> {Next, undefined, BQS2} = + BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> {Next, Msgs, BQS2} = + BQ:dropwhile(ExpirePred, true, BQS), + DLXFun(Msgs), + {Next, BQS2} + end, ensure_ttl_timer(case Props of undefined -> undefined; #message_properties{expiry = Exp} -> Exp @@ -867,6 +874,12 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> ExclusiveOwner; +i(policy, #q{q = #amqqueue{name = Name}}) -> + {ok, Q} = rabbit_amqqueue:lookup(Name), + case rabbit_policy:name(Q) of + none -> ''; + Policy -> Policy + end; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c6d1778532..af660c60a0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -84,12 +84,16 @@ %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). --callback publish_delivered(true, rabbit_types:basic_message(), +-callback publish_delivered(rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) - -> {ack(), state()}; - (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {undefined, state()}. + -> {ack(), state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ for some +%% reason. Note that this may be invoked for messages for which +%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', +%% BQS}. +-callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -200,13 +204,6 @@ -callback is_duplicate(rabbit_types:basic_message(), state()) -> {'false'|'published'|'discarded', state()}. -%% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this is may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. --callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). - -else. -export([behaviour_info/1]). @@ -214,12 +211,11 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, - {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, - {discard, 3}]; + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index e40d9b29e6..b37fbb29e2 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -119,7 +119,7 @@ qc_publish_multiple(#state{}) -> qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, - [boolean(), qc_message(), #message_properties{}, self(), BQ]}. + [qc_message(), #message_properties{}, self(), BQ]}. qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. @@ -199,7 +199,7 @@ next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) -> next_state(S, Res, {call, ?BQMOD, publish_delivered, - [AckReq, Msg, MsgProps, _Pid, _BQ]}) -> + [Msg, MsgProps, _Pid, _BQ]}) -> #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S, AckTag = {call, erlang, element, [1, Res]}, BQ1 = {call, erlang, element, [2, Res]}, @@ -213,10 +213,7 @@ next_state(S, Res, true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end, - acks = case AckReq of - true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]; - false -> Acks - end + acks = [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks] }; next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> @@ -391,4 +388,13 @@ drop_messages(Messages) -> end end. +-else. + +-export([prop_disabled/0]). + +prop_disabled() -> + exit({compiled_without_proper, + "PropEr was not present during compilation of the test module. " + "Hence all tests are disabled."}). + -endif. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index e75e1f6f7c..25f7d758c6 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -70,6 +70,10 @@ {clear_parameter, [?VHOST_DEF]}, {list_parameters, [?VHOST_DEF]}, + {set_policy, [?VHOST_DEF]}, + {clear_policy, [?VHOST_DEF]}, + {list_policies, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -98,7 +102,9 @@ {"Bindings", rabbit_binding, info_all, info_keys}, {"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys}, {"Permissions", rabbit_auth_backend_internal, list_vhost_permissions, - vhost_perms_info_keys}]). + vhost_perms_info_keys}, + {"Policies", rabbit_policy, list_formatted, info_keys}, + {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). %%---------------------------------------------------------------------------- @@ -170,8 +176,8 @@ start() -> {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); - {parse_error, {_Line, Mod, Err}} -> - print_error("~s", [lists:flatten(Mod:format_error(Err))]), + {error_string, Reason} -> + print_error("~s", [Reason]), rabbit_misc:quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), @@ -458,6 +464,28 @@ action(list_parameters, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), rabbit_runtime_parameters:info_keys()); +action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform) + when Prio == [] orelse length(Prio) == 1 -> + Msg = "Setting policy ~p for pattern ~p to ~p", + {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined}; + [P] -> {Msg ++ " with priority ~s", P} + end, + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform(InformMsg, [Key, Pattern, Defn] ++ Prio), + rpc_call(Node, rabbit_policy, parse_set, + [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]); + +action(clear_policy, Node, [Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing policy ~p", [Key]), + rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); + +action(list_policies, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]), + rabbit_policy:info_keys()); + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || @@ -477,12 +505,14 @@ action(eval, Node, [Expr], _Opts, _Inform) -> Node, erl_eval, exprs, [Parsed, []]), io:format("~p~n", [Value]), ok; - {error, E} -> {parse_error, E} + {error, E} -> {error_string, format_parse_error(E)} end; {error, E, _} -> - {parse_error, E} + {error_string, format_parse_error(E)} end. +format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). + %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Application, Inform) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4cc96ef552..a205b23d0b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -298,7 +298,10 @@ i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; -i(policy, X) -> rabbit_policy:name(X); +i(policy, X) -> case rabbit_policy:name(X) of + none -> ''; + Policy -> Policy + end; i(Item, _) -> throw({bad_argument, Item}). info(X = #exchange{}) -> infos(?INFO_KEYS, X). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index ba0cb04f71..cedbbdb380 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -144,11 +144,7 @@ gen_secure() -> %% employs base64url encoding, which is safer in more contexts than %% plain base64. string(G, Prefix) -> - Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; - ($\/, Acc) -> [$\_ | Acc]; - ($\=, Acc) -> Acc; - (Chr, Acc) -> [Chr | Acc] - end, [], base64:encode_to_string(G)). + Prefix ++ "-" ++ rabbit_misc:base64url(G). binary(G, Prefix) -> list_to_binary(string(G, Prefix)). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 6cd71fc314..e1a21cf786 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -326,7 +326,9 @@ ensure_monitoring(CPid, Pids) -> init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> GM1 = case GM of undefined -> - {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + {ok, GM2} = gm:start_link( + QueueName, ?MODULE, [self()], + fun rabbit_misc:execute_mnesia_transaction/1), receive {joined, GM2, _Members} -> ok end, @@ -349,7 +351,7 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 6dac280851..cce19c907a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,11 +17,11 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). + status/1, invoke/3, is_duplicate/2, fold/3]). -export([start/1, stop/0]). @@ -97,10 +97,18 @@ init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) -> ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State. -init_with_existing_bq(Q, BQ, BQS) -> +init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), depth_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Self = self(), + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue{gm_pids = GMPids}] + = mnesia:read({rabbit_queue, QName}), + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) + end), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -157,7 +165,7 @@ stop_all_slaves(Reason, #state{gm = GM}) -> fun () -> [Q] = mnesia:read({rabbit_queue, QName}), rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { slave_pids = [] }) + Q #amqqueue { gm_pids = [], slave_pids = [] }) end), ok = gm:forget_group(QName). @@ -175,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). -publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, +publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS, ack_msg_id = AM }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast( - GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), - {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), AM1 = maybe_store_acktag(AckTag, MsgId, AM), - {AckTag, - ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 })}. + State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }, + {AckTag, ensure_monitoring(ChPid, State1)}. + +discard(MsgId, ChPid, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + BQS1 = BQ:discard(MsgId, ChPid, BQS), + ensure_monitoring( + ChPid, State #state { + backing_queue_state = BQS1, + seen_status = dict:erase(MsgId, SS) }); + {ok, discarded} -> + State + end. dropwhile(Pred, AckRequired, State = #state{gm = GM, @@ -367,26 +392,6 @@ is_duplicate(Message = #basic_message { id = MsgId }, {discarded, State} end. -discard(Msg = #basic_message { id = MsgId }, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, Msg}), - ensure_monitoring( - ChPid, State #state { - backing_queue_state = BQ:discard(Msg, ChPid, BQS), - seen_status = dict:erase(MsgId, SS) }); - {ok, discarded} -> - State - end. - %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b6c229aac6..4a00846e4d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -15,22 +15,32 @@ %% -module(rabbit_mirror_queue_misc). +-behaviour(rabbit_policy_validator). --export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2, +-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2]). + is_mirrored/1, update_mirrors/2, validate_policy/1]). %% for testing only -export([suggested_queue_nodes/4]). -include("rabbit.hrl"). +-rabbit_boot_step({?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(remove_from_queue/2 :: - (rabbit_amqqueue:name(), [pid()]) +-spec(remove_from_queue/3 :: + (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). -spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). @@ -57,9 +67,7 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} - -remove_from_queue(QueueName, DeadGMPids) -> - DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], +remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -67,20 +75,27 @@ remove_from_queue(QueueName, DeadGMPids) -> case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> - Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + slave_pids = SPids, + gm_pids = GMPids }] -> + {Dead, GMPids1} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- Dead], + Alive = [QPid | SPids] -- DeadPids, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> + GMPids = GMPids1, %% ASSERTION {ok, QPid1, []}; - _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> + _ when QPid =:= QPid1 orelse QPid1 =:= Self -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. store_updated_slaves( Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), + slave_pids = SPids1, + gm_pids = GMPids1 }), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -156,10 +171,8 @@ add_mirror(QName, MirrorNode) -> start_child(Name, MirrorNode, Q); [SPid] -> case rabbit_misc:is_process_alive(SPid) of - true -> - {ok, already_mirrored}; - false -> - start_child(Name, MirrorNode, Q) + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q) end end end). @@ -325,3 +338,35 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. + +%%---------------------------------------------------------------------------- + +validate_policy(KeyList) -> + validate_policy( + proplists:get_value(<<"ha-mode">>, KeyList), + proplists:get_value(<<"ha-params">>, KeyList, none)). + +validate_policy(<<"all">>, none) -> + ok; +validate_policy(<<"all">>, _Params) -> + {error, "ha-mode=\"all\" does not take parameters", []}; + +validate_policy(<<"nodes">>, []) -> + {error, "ha-mode=\"nodes\" list must be non-empty", []}; +validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) -> + case [I || I <- Nodes, not is_binary(I)] of + [] -> ok; + Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, " + "~p was not a string", [Invalid]} + end; +validate_policy(<<"nodes">>, Params) -> + {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]}; + +validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 -> + ok; +validate_policy(<<"exactly">>, Params) -> + {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]}; + +validate_policy(Mode, _Params) -> + {error, "~p is not a valid ha-mode value", [Mode]}. + diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9e2901265d..1ba1420f42 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -64,7 +64,6 @@ -record(state, { q, gm, - master_pid, backing_queue, backing_queue_state, sync_timer_ref, @@ -87,7 +86,7 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(#amqqueue { name = QueueName } = Q) -> +init(Q = #amqqueue { name = QName }) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -100,23 +99,24 @@ init(#amqqueue { name = QueueName } = Q) -> %% above. %% process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + {ok, GM} = gm:start_link(QName, ?MODULE, [self()], + fun rabbit_misc:execute_mnesia_transaction/1), receive {joined, GM} -> ok end, Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun() -> init_it(Self, Node, QueueName) end) of - {new, MPid} -> - erlang:monitor(process, MPid), + fun() -> init_it(Self, GM, Node, QName) end) of + {new, QPid} -> + erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, + Q1 = Q #amqqueue { pid = QPid }, + BQS = bq_init(BQ, Q1, false), + State = #state { q = Q1, gm = GM, - master_pid = MPid, backing_queue = BQ, backing_queue_state = BQS, rate_timer_ref = undefined, @@ -141,14 +141,15 @@ init(#amqqueue { name = QueueName } = Q) -> duplicate_live_master -> {stop, {duplicate_live_master, Node}}; existing -> + gm:leave(GM), ignore end. -init_it(Self, Node, QueueName) -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> add_slave(Q1, Self, MPids), +init_it(Self, GM, Node, QName) -> + [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] = + mnesia:read({rabbit_queue, QName}), + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of + [] -> add_slave(Q, Self, GM), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; @@ -156,15 +157,20 @@ init_it(Self, Node, QueueName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> add_slave(Q1, Self, MPids -- [SPid]), + false -> Q1 = Q#amqqueue { + slave_pids = SPids -- [SPid], + gm_pids = [T || T = {_, S} <- GMPids, + S =/= SPid] }, + add_slave(Q1, Self, GM), {new, QPid} end end. %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 -add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( - Q#amqqueue{slave_pids = MPids ++ [New]}). +add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. @@ -172,30 +178,29 @@ handle_call({deliver, Delivery, true}, From, State) -> noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, - State = #state { q = #amqqueue { name = QueueName }, - master_pid = MPid }) -> - %% The GM has told us about deaths, which means we're not going to - %% receive any more messages from GM - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> + Self = self(), + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; {ok, Pid, DeadPids} -> - rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), - if node(Pid) =:= node(MPid) -> + case Pid of + MPid -> %% master hasn't changed gen_server2:reply(From, ok), noreply(State); - node(Pid) =:= node() -> + Self -> %% we've become master QueueState = promote_me(From, State), {become, rabbit_amqqueue_process, QueueState, hibernate}; - true -> - %% master has changed to not us. + _ -> + %% master has changed to not us gen_server2:reply(From, ok), erlang:monitor(process, Pid), - noreply(State #state { master_pid = Pid }) + noreply(State #state { q = Q #amqqueue { pid = Pid } }) end end; @@ -245,7 +250,7 @@ handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, - State = #state { gm = GM, master_pid = MPid }) -> + State = #state { gm = GM, q = #amqqueue { pid = MPid } }) -> ok = gm:broadcast(GM, process_death), noreply(State); @@ -368,7 +373,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; -i(master_pid, #state { master_pid = MPid }) -> MPid; +i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid; i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). @@ -387,14 +392,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> - never; -needs_confirming(#delivery { message = #basic_message { - is_persistent = true } }, - #state { q = #amqqueue { durable = true } }) -> - eventually; -needs_confirming(_Delivery, _State) -> - immediately. +send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> + MS; +send_or_record_confirm(published, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo, + message = #basic_message { + id = MsgId, + is_persistent = true } }, + MS, #state { q = #amqqueue { durable = true } }) -> + dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); +send_or_record_confirm(_Status, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo }, + MS, _State) -> + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), + MS. confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {CMs, MS1} = @@ -621,9 +632,8 @@ confirm_sender_death(Pid) -> ok. maybe_enqueue_message( - Delivery = #delivery { message = #basic_message { id = MsgId }, - msg_seq_no = MsgSeqNo, - sender = ChPid }, + Delivery = #delivery { message = #basic_message { id = MsgId }, + sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. @@ -633,28 +643,11 @@ maybe_enqueue_message( MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; - {ok, confirmed} -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - {ok, published} -> - MS1 = case needs_confirming(Delivery, State1) of - never -> dict:erase(MsgId, MS); - eventually -> MMS = {published, ChPid, MsgSeqNo}, - dict:store(MsgId, MMS, MS); - immediately -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]), - dict:erase(MsgId, MS) - end, + {ok, Status} -> + MS1 = send_or_record_confirm( + Status, Delivery, dict:erase(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = MS1, - sender_queues = SQ1 }; - {ok, discarded} -> - %% We've already heard from GM that the msg is to be - %% discarded. We won't see this again. - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } end. @@ -672,39 +665,26 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) end. -process_instruction( - {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - - %% We really are going to do the publish right now, even though we - %% may not have seen it directly from the channel. But we cannot - %% issues confirms until the latter has happened. So we need to - %% keep track of the MsgId and its confirmation status in the - %% meantime. +publish_or_discard(Status, ChPid, MsgId, + State = #state { sender_queues = SQ, msg_id_status = MS }) -> + %% We really are going to do the publish/discard right now, even + %% though we may not have seen it directly from the channel. But + %% we cannot issues confirms until the latter has happened. So we + %% need to keep track of the MsgId and its confirmation status in + %% the meantime. State1 = ensure_monitoring(ChPid, State), {MQ, PendingCh} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, published, MS)}; + dict:store(MsgId, Status, MS)}; {{value, Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }}, MQ2} -> + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> MS; - eventually -> MMS = {published, ChPid, MsgSeqNo}, - dict:store(MsgId, MMS , MS); - immediately -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]), - MS - end}; + send_or_record_confirm(Status, Delivery, MS, State1)}; {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} @@ -713,52 +693,28 @@ process_instruction( %% expecting any confirms from us. {MQ, PendingCh, MS} end, - - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), - State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, - - {ok, - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State2 #state { backing_queue_state = BQS1 }; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, - ChPid, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State2 #state { backing_queue_state = BQS1 }) - end}; -process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - %% Many of the comments around the publish head above apply here - %% too. - State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - {MQ1, PendingCh1, MS1} = - case queue:out(MQ) of - {empty, _MQ} -> - {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, discarded, MS)}; - {{value, #delivery { message = #basic_message { id = MsgId } }}, - MQ2} -> - %% We've already seen it from the channel, we're not - %% going to see this again, so don't add it to MS - {MQ2, PendingCh, MS}; - {{value, #delivery {}}, _MQ2} -> - %% The instruction was sent to us before we were - %% within the slave_pids within the #amqqueue{} - %% record. We'll never receive the message directly - %% from the channel. - {MQ, PendingCh, MS} - end, SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), - BQS1 = BQ:discard(Msg, ChPid, BQS), - {ok, State1 #state { sender_queues = SQ1, - msg_id_status = MS1, - backing_queue_state = BQS1 }}; + State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. + + +process_instruction({publish, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; +process_instruction({publish_delivered, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {ok, maybe_store_ack(true, MsgId, AckTag, + State1 #state { backing_queue_state = BQS1 })}; +process_instruction({discard, ChPid, MsgId}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(discarded, ChPid, MsgId, State), + BQS1 = BQ:discard(MsgId, ChPid, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index a0536a50a9..ab9a9cebd4 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -63,6 +63,7 @@ -export([version/0]). -export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). +-export([base64url/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -227,6 +228,7 @@ -spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error'). -spec(json_to_term/1 :: (any()) -> any()). -spec(term_to_json/1 :: (any()) -> any()). +-spec(base64url/1 :: (binary()) -> string()). -endif. @@ -987,3 +989,10 @@ term_to_json(L) when is_list(L) -> term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. + +base64url(In) -> + lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; + ($\/, Acc) -> [$\_ | Acc]; + ($\=, Acc) -> Acc; + (Chr, Acc) -> [Chr | Acc] + end, [], base64:encode_to_string(In))). diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index f4c1f42b21..9af8fa1896 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -22,11 +22,13 @@ -include("rabbit.hrl"). --import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_misc, [pget/2]). -export([register/0]). -export([name/1, get/2, set/1]). -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, + list_formatted/1, info_keys/0]). -rabbit_boot_step({?MODULE, [{description, "policy parameters"}, @@ -55,7 +57,7 @@ get(Name, EntityName = #resource{virtual_host = VHost}) -> get0(Name, match(EntityName, list(VHost))). get0(_Name, undefined) -> {error, not_found}; -get0(Name, List) -> case pget(<<"policy">>, List) of +get0(Name, List) -> case pget(definition, List) of undefined -> {error, not_found}; Policy -> case pget(Name, Policy) of undefined -> {error, not_found}; @@ -65,6 +67,81 @@ get0(Name, List) -> case pget(<<"policy">>, List) of %%---------------------------------------------------------------------------- +parse_set(VHost, Name, Pattern, Definition, undefined) -> + parse_set0(VHost, Name, Pattern, Definition, 0); +parse_set(VHost, Name, Pattern, Definition, Priority) -> + try list_to_integer(Priority) of + Num -> parse_set0(VHost, Name, Pattern, Definition, Num) + catch + error:badarg -> {error, "~p priority must be a number", [Priority]} + end. + +parse_set0(VHost, Name, Pattern, Defn, Priority) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set0(VHost, Name, + [{<<"pattern">>, list_to_binary(Pattern)}, + {<<"definition">>, rabbit_misc:json_to_term(JSON)}, + {<<"priority">>, Priority}]); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Name, Pattern, Definition, Priority) -> + PolicyProps = [{<<"pattern">>, Pattern}, + {<<"definition">>, Definition}, + {<<"priority">>, case Priority of + undefined -> 0; + _ -> Priority + end}], + set0(VHost, Name, PolicyProps). + +set0(VHost, Name, Term) -> + rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term). + +delete(VHost, Name) -> + rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name). + +lookup(VHost, Name) -> + case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of + not_found -> not_found; + P -> p(P, fun ident/1) + end. + +list() -> + list('_'). + +list(VHost) -> + list0(VHost, fun ident/1). + +list_formatted(VHost) -> + order_policies(list0(VHost, fun format/1)). + +list0(VHost, DefnFun) -> + [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. + +order_policies(PropList) -> + lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end, + PropList). + +p(Parameter, DefnFun) -> + Value = pget(value, Parameter), + [{vhost, pget(vhost, Parameter)}, + {name, pget(name, Parameter)}, + {pattern, pget(<<"pattern">>, Value)}, + {definition, DefnFun(pget(<<"definition">>, Value))}, + {priority, pget(<<"priority">>, Value)}]. + +format(Term) -> + {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)), + list_to_binary(JSON). + +ident(X) -> X. + +info_keys() -> [vhost, name, pattern, definition, priority]. + +%%---------------------------------------------------------------------------- + validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). @@ -80,10 +157,6 @@ notify_clear(VHost, <<"policy">>, _Name) -> %%---------------------------------------------------------------------------- -list(VHost) -> - [[{<<"name">>, pget(key, P)} | pget(value, P)] - || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. - update_policies(VHost) -> Policies = list(VHost), {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( @@ -127,13 +200,52 @@ match(Name, Policies) -> end. matches(#resource{name = Name}, Policy) -> - match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]). + match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]). -sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0). +sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). %%---------------------------------------------------------------------------- policy_validation() -> - [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional}, - {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, - {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. + [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, + {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"definition">>, fun validation/2, mandatory}]. + +validation(_Name, []) -> + {error, "no policy provided", []}; +validation(_Name, Terms) when is_list(Terms) -> + {Keys, Modules} = lists:unzip( + rabbit_registry:lookup_all(policy_validator)), + [] = dups(Keys), %% ASSERTION + Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), + {TermKeys, _} = lists:unzip(Terms), + case dups(TermKeys) of + [] -> validation0(Validators, Terms); + Dup -> {error, "~p duplicate keys not allowed", [Dup]} + end; +validation(_Name, Term) -> + {error, "parse error while reading policy: ~p", [Term]}. + +validation0(Validators, Terms) -> + case lists:foldl( + fun (Mod, {ok, TermsLeft}) -> + ModKeys = proplists:get_all_values(Mod, Validators), + case [T || {Key, _} = T <- TermsLeft, + lists:member(Key, ModKeys)] of + [] -> {ok, TermsLeft}; + Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope} + end; + (_, Acc) -> + Acc + end, {ok, Terms}, proplists:get_keys(Validators)) of + {ok, []} -> + ok; + {ok, Unvalidated} -> + {error, "~p are not recognised policy settings", [Unvalidated]}; + {Error, _} -> + Error + end. + +a2b(A) -> list_to_binary(atom_to_list(A)). + +dups(L) -> L -- lists:usort(L). diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl new file mode 100644 index 0000000000..624c3d54f5 --- /dev/null +++ b/src/rabbit_policy_validator.erl @@ -0,0 +1,37 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_policy_validator). + +-ifdef(use_specs). + +-type(validate_results() :: + 'ok' | {error, string(), [term()]} | [validate_results()]). + +-callback validate_policy([{binary(), term()}]) -> validate_results(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {validate_policy, 1}, + ]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index e14bbba018..32709d2484 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -107,7 +107,8 @@ sanity_check_module(ClassModule, Module) -> class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator. +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index b58b459a7f..4a83e61ff9 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). --export([parse_set/4, set/4, clear/3, - list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1, - lookup/3, value/3, value/4, info_keys/0]). +-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1, + list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3, + value/3, value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -32,8 +32,12 @@ -> ok_or_error_string()). -spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) + -> ok_or_error_string()). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). +-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) + -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). @@ -57,29 +61,37 @@ %%--------------------------------------------------------------------------- -parse_set(VHost, Component, Key, String) -> +parse_set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; +parse_set(VHost, Component, Name, String) -> case rabbit_misc:json_decode(String) of - {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON)); + {ok, JSON} -> set(VHost, Component, Name, + rabbit_misc:json_to_term(JSON)); error -> {error_string, "JSON decoding error"} end. -set(VHost, Component, Key, Term) -> - case set0(VHost, Component, Key, Term) of - ok -> ok; - {errors, L} -> format_error(L) - end. +set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; +set(VHost, Component, Name, Term) -> + set_any(VHost, Component, Name, Term). format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. -set0(VHost, Component, Key, Term) -> +set_any(VHost, Component, Name, Term) -> + case set_any0(VHost, Component, Name, Term) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +set_any0(VHost, Component, Name, Term) -> case lookup_component(Component) of {ok, Mod} -> - case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of + case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of ok -> - case mnesia_update(VHost, Component, Key, Term) of + case mnesia_update(VHost, Component, Name, Term) of {old, Term} -> ok; - _ -> Mod:notify(VHost, Component, Key, Term) + _ -> Mod:notify(VHost, Component, Name, Term) end, ok; E -> @@ -89,43 +101,49 @@ set0(VHost, Component, Key, Term) -> E end. -mnesia_update(VHost, Component, Key, Term) -> +mnesia_update(VHost, Component, Name, Term) -> rabbit_misc:execute_mnesia_transaction( fun () -> - Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of + Res = case mnesia:read(?TABLE, {VHost, Component, Name}, read) of [] -> new; [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write), + ok = mnesia:write(?TABLE, c(VHost, Component, Name, Term), write), Res end). -clear(VHost, Component, Key) -> - case clear0(VHost, Component, Key) of +clear(_, <<"policy">> , _) -> + {error_string, "policies may not be cleared using this method"}; +clear(VHost, Component, Name) -> + clear_any(VHost, Component, Name). + +clear_any(VHost, Component, Name) -> + case clear_any0(VHost, Component, Name) of ok -> ok; {errors, L} -> format_error(L) end. -clear0(VHost, Component, Key) -> +clear_any0(VHost, Component, Name) -> case lookup_component(Component) of {ok, Mod} -> case flatten_errors( - Mod:validate_clear(VHost, Component, Key)) of - ok -> mnesia_clear(VHost, Component, Key), - Mod:notify_clear(VHost, Component, Key), + Mod:validate_clear(VHost, Component, Name)) of + ok -> mnesia_clear(VHost, Component, Name), + Mod:notify_clear(VHost, Component, Name), ok; E -> E end; E -> E end. -mnesia_clear(VHost, Component, Key) -> +mnesia_clear(VHost, Component, Name) -> ok = rabbit_misc:execute_mnesia_transaction( fun () -> - ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write) + ok = mnesia:delete(?TABLE, {VHost, Component, Name}, write) end). list() -> - [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. + [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- + rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>]. list(VHost) -> list(VHost, '_', []). list_strict(Component) -> list('_', Component, not_found). @@ -136,60 +154,63 @@ list(VHost, Component, Default) -> case component_good(Component) of true -> Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'}, - [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; + [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- + mnesia:dirty_match_object(?TABLE, Match), + Comp =/= <<"policy">> orelse + Component =:= <<"policy">>]; _ -> Default end. list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. -lookup(VHost, Component, Key) -> - case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of +lookup(VHost, Component, Name) -> + case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(VHost, Component, Key) -> - case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of +value(VHost, Component, Name) -> + case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(VHost, Component, Key, Default) -> - Params = lookup0(VHost, Component, Key, +value(VHost, Component, Name, Default) -> + Params = lookup0(VHost, Component, Name, fun () -> - lookup_missing(VHost, Component, Key, Default) + lookup_missing(VHost, Component, Name, Default) end), Params#runtime_parameters.value. -lookup0(VHost, Component, Key, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of +lookup0(VHost, Component, Name, DefaultFun) -> + case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(VHost, Component, Key, Default) -> +lookup_missing(VHost, Component, Name, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {VHost, Component, Key}, read) of - [] -> Record = c(VHost, Component, Key, Default), + case mnesia:read(?TABLE, {VHost, Component, Name}, read) of + [] -> Record = c(VHost, Component, Name, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(VHost, Component, Key, Default) -> - #runtime_parameters{key = {VHost, Component, Key}, +c(VHost, Component, Name, Default) -> + #runtime_parameters{key = {VHost, Component, Name}, value = Default}. -p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) -> +p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> [{vhost, VHost}, {component, Component}, - {key, Key}, + {name, Name}, {value, Value}]. -info_keys() -> [component, key, value]. +info_keys() -> [component, name, value]. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index 5224ccaa36..d4d7271e90 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -16,9 +16,14 @@ -module(rabbit_runtime_parameters_test). -behaviour(rabbit_runtime_parameter). +-behaviour(rabbit_policy_validator). -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). -export([register/0, unregister/0]). +-export([validate_policy/1]). +-export([register_policy_validator/0, unregister_policy_validator/0]). + +%---------------------------------------------------------------------------- register() -> rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE). @@ -36,3 +41,28 @@ validate_clear(_, <<"test">>, _) -> {error, "meh", []}. notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. + +%---------------------------------------------------------------------------- + +register_policy_validator() -> + rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE), + rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE). + +unregister_policy_validator() -> + rabbit_registry:unregister(policy_validator, <<"testeven">>), + rabbit_registry:unregister(policy_validator, <<"testpos">>). + +validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) -> + case length(Terms) rem 2 =:= 0 of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) -> + case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy(_) -> + {error, "meh", []}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2e26837df0..962bb64889 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -57,6 +57,7 @@ all_tests() -> passed = test_dynamic_mirroring(), passed = test_user_management(), passed = test_runtime_parameters(), + passed = test_policy_validation(), passed = test_server_status(), passed = test_confirms(), passed = @@ -774,7 +775,9 @@ test_log_management_during_startup() -> ok = case catch control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, log_rotation_tty_no_handlers_test}); - {error, {cannot_log_to_tty, _, _}} -> ok + {badrpc, {'EXIT', {rabbit,failure_during_boot, + {error,{cannot_log_to_tty, + _, not_installed}}}}} -> ok end, %% fix sasl logging @@ -798,7 +801,9 @@ test_log_management_during_startup() -> ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, log_rotation_no_write_permission_dir_test}); - {error, {cannot_log_to_file, _, _}} -> ok + {badrpc, {'EXIT', + {rabbit, failure_during_boot, + {error, {cannot_log_to_file, _, _}}}}} -> ok end, %% start application with logging to a subdirectory which @@ -809,8 +814,11 @@ test_log_management_during_startup() -> ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, log_rotatation_parent_dirs_test}); - {error, {cannot_log_to_file, _, - {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok + {badrpc, + {'EXIT', {rabbit,failure_during_boot, + {error, {cannot_log_to_file, _, + {error, + {cannot_create_parent_dirs, _, eacces}}}}}}} -> ok end, ok = set_permissions(TmpDir, 8#00700), ok = set_permissions(TmpLog, 8#00600), @@ -1039,6 +1047,26 @@ test_runtime_parameters() -> rabbit_runtime_parameters_test:unregister(), passed. +test_policy_validation() -> + rabbit_runtime_parameters_test:register_policy_validator(), + SetPol = + fun (Key, Val) -> + control_action( + set_policy, + ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) + end, + + ok = SetPol("testeven", []), + ok = SetPol("testeven", [1, 2]), + ok = SetPol("testeven", [1, 2, 3, 4]), + ok = SetPol("testpos", [2, 5, 5678]), + + {error_string, _} = SetPol("testpos", [-1, 0, 1]), + {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + + rabbit_runtime_parameters_test:unregister_policy_validator(), + passed. + test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), @@ -1100,8 +1128,8 @@ test_server_status() -> ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), %% eval - {parse_error, _} = control_action(eval, ["\""]), - {parse_error, _} = control_action(eval, ["a("]), + {error_string, _} = control_action(eval, ["\""]), + {error_string, _} = control_action(eval, ["a("]), ok = control_action(eval, ["a."]), %% cleanup diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index ddc9c565b1..21fdcd667b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -42,6 +42,7 @@ [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). +-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). %% ------------------------------------------------------------------- @@ -66,6 +67,7 @@ -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). +-spec(gm_pids/0 :: () -> 'ok'). -endif. @@ -268,6 +270,19 @@ no_mirror_nodes() -> || T <- Tables], ok. +gm_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddGMPidsFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []} + end, + [ok = transform(T, AddGMPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy, gm_pids]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddb136a73d..8a3fd9d917 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,11 +17,11 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/4, publish_delivered/5, drain_confirmed/1, + publish/4, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, ram_msg_count = RamMsgCount + 1, unconfirmed = UC1 })). -publish_delivered(false, #basic_message { id = MsgId }, - #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { async_callback = Callback, - len = 0 }) -> - case NeedsConfirming of - true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); - false -> ok - end, - {undefined, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, +publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, State = #vqstate { len = 0, @@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +discard(_MsgId, _ChPid, State) -> State. + drain_confirmed(State = #vqstate { confirmed = C }) -> case gb_sets:is_empty(C) of true -> {[], State}; %% common case @@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). is_duplicate(_Msg, State) -> {false, State}. -discard(_Msg, _ChPid, State) -> State. - %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -1325,12 +1316,9 @@ must_sync_index(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -blind_confirm(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). - msgs_written_to_disk(Callback, MsgIdSet, ignored) -> - blind_confirm(Callback, MsgIdSet); + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); msgs_written_to_disk(Callback, MsgIdSet, written) -> Callback(?MODULE, fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 03dfbe245d..297fa56fe3 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -97,6 +97,8 @@ internal_delete(VHostPath) -> proplists:get_value(component, Info), proplists:get_value(key, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], + [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info)) + || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. |
