diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-04-20 09:03:52 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-04-20 09:03:52 +0300 |
| commit | eb1f6e5cdd8e53164135c10dc79f9a9496d53755 (patch) | |
| tree | e0e429f8adbd81e378c776547c462669eee14fcc /src | |
| parent | 628d803b95a6aa93e82e6b6e53dce70a7513a522 (diff) | |
| parent | 57c4a7af72268b8cf88efd2ec774616dd14aa31f (diff) | |
| download | rabbitmq-server-git-eb1f6e5cdd8e53164135c10dc79f9a9496d53755.tar.gz | |
Merge branch 'stable' into rabbitmq-server-343bis
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 351 | ||||
| -rw-r--r-- | src/rabbit.erl | 142 | ||||
| -rw-r--r-- | src/rabbit_cli.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 2 |
8 files changed, 424 insertions, 167 deletions
diff --git a/src/gm.erl b/src/gm.erl index aeb050e15f..199cf7c4de 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -617,14 +617,20 @@ handle_call({add_on_right, NewMember}, _From, group_name = GroupName, members_state = MembersState, txn_executor = TxnFun }) -> - Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), - View1 = group_to_view(Group), - MembersState1 = remove_erased_members(MembersState, View1), - ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state(MembersState1)}), - {Result, State1} = change_view(View1, State #state { - members_state = MembersState1 }), - handle_callback_result({Result, {ok, Group}, State1}). + try + Group = record_new_member_in_group( + NewMember, Self, GroupName, TxnFun), + View1 = group_to_view(check_membership(Self, Group)), + MembersState1 = remove_erased_members(MembersState, View1), + ok = send_right(NewMember, View1, + {catchup, Self, prepare_members_state(MembersState1)}), + {Result, State1} = change_view(View1, State #state { + members_state = MembersState1 }), + handle_callback_result({Result, {ok, Group}, State1}) + catch + lost_membership -> + {stop, normal, State} + end. %% add_on_right causes a catchup to be sent immediately from the left, %% so we can never see this from the left neighbour. However, it's @@ -638,19 +644,28 @@ handle_cast({?TAG, _ReqVer, check_neighbours}, handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, + self = Self, members_state = MembersState, group_name = GroupName }) -> - {Result, State1} = - case needs_view_update(ReqVer, View) of - true -> View1 = group_to_view(dirty_read_group(GroupName)), - MemberState1 = remove_erased_members(MembersState, View1), - change_view(View1, State #state { - members_state = MemberState1 }); - false -> {ok, State} - end, - handle_callback_result( - if_callback_success( - Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); + try + {Result, State1} = + case needs_view_update(ReqVer, View) of + true -> + View1 = group_to_view( + check_membership(Self, + dirty_read_group(GroupName))), + MemberState1 = remove_erased_members(MembersState, View1), + change_view(View1, State #state { + members_state = MemberState1 }); + false -> {ok, State} + end, + handle_callback_result( + if_callback_success( + Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)) + catch + lost_membership -> + {stop, normal, State} + end; handle_cast({broadcast, _Msg, _SizeHint}, State = #state { shutting_down = {true, _} }) -> @@ -724,39 +739,44 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, group_name = GroupName, confirms = Confirms, txn_executor = TxnFun }) -> - Member = case {Left, Right} of - {{Member1, MRef}, _} -> Member1; - {_, {Member1, MRef}} -> Member1; - _ -> undefined - end, - case {Member, Reason} of - {undefined, _} -> - noreply(State); - {_, {shutdown, ring_shutdown}} -> - noreply(State); - _ -> - %% In the event of a partial partition we could see another member - %% go down and then remove them from Mnesia. While they can - %% recover from this they'd have to restart the queue - not - %% ideal. So let's sleep here briefly just in case this was caused - %% by a partial partition; in which case by the time we record the - %% member death in Mnesia we will probably be in a full - %% partition and will not be assassinating another member. - timer:sleep(100), - View1 = group_to_view(record_dead_member_in_group( - Member, GroupName, TxnFun)), - handle_callback_result( - case alive_view_members(View1) of - [Self] -> maybe_erase_aliases( - State #state { - members_state = blank_member_state(), - confirms = purge_confirms(Confirms) }, - View1); - _ -> change_view(View1, State) - end) + try + check_membership(GroupName), + Member = case {Left, Right} of + {{Member1, MRef}, _} -> Member1; + {_, {Member1, MRef}} -> Member1; + _ -> undefined + end, + case {Member, Reason} of + {undefined, _} -> + noreply(State); + {_, {shutdown, ring_shutdown}} -> + noreply(State); + _ -> + %% In the event of a partial partition we could see another member + %% go down and then remove them from Mnesia. While they can + %% recover from this they'd have to restart the queue - not + %% ideal. So let's sleep here briefly just in case this was caused + %% by a partial partition; in which case by the time we record the + %% member death in Mnesia we will probably be in a full + %% partition and will not be assassinating another member. + timer:sleep(100), + View1 = group_to_view(record_dead_member_in_group(Self, + Member, GroupName, TxnFun, true)), + handle_callback_result( + case alive_view_members(View1) of + [Self] -> maybe_erase_aliases( + State #state { + members_state = blank_member_state(), + confirms = purge_confirms(Confirms) }, + View1); + _ -> change_view(View1, State) + end) + end + catch + lost_membership -> + {stop, normal, State} end. - terminate(Reason, #state { module = Module, callback_args = Args }) -> Module:handle_terminate(Args, Reason). @@ -841,52 +861,30 @@ handle_msg({catchup, _NotLeft, _MembersState}, State) -> handle_msg({activity, Left, Activity}, State = #state { self = Self, + group_name = GroupName, left = {Left, _MRefL}, view = View, members_state = MembersState, confirms = Confirms }) when MembersState =/= undefined -> - {MembersState1, {Confirms1, Activity1}} = - lists:foldl( - fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> - with_member_acc( - fun (Member = #member { pending_ack = PA, - last_pub = LP, - last_ack = LA }, - {Confirms2, Activity2}) -> - case is_member_alias(Id, Self, View) of - true -> - {ToAck, PA1} = - find_common(queue_from_pubs(Pubs), PA, - queue:new()), - LA1 = last_ack(Acks, LA), - AckNums = acks_from_queue(ToAck), - Confirms3 = maybe_confirm( - Self, Id, Confirms2, AckNums), - {Member #member { pending_ack = PA1, - last_ack = LA1 }, - {Confirms3, - activity_cons( - Id, [], AckNums, Activity2)}}; - false -> - PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), - LA1 = last_ack(Acks, LA), - LP1 = last_pub(Pubs, LP), - {Member #member { pending_ack = PA1, - last_pub = LP1, - last_ack = LA1 }, - {Confirms2, - activity_cons(Id, Pubs, Acks, Activity2)}} - end - end, Id, MembersStateConfirmsActivity) - end, {MembersState, {Confirms, activity_nil()}}, Activity), - State1 = State #state { members_state = MembersState1, - confirms = Confirms1 }, - Activity3 = activity_finalise(Activity1), - ok = maybe_send_activity(Activity3, State1), - {Result, State2} = maybe_erase_aliases(State1, View), - if_callback_success( - Result, fun activity_true/3, fun activity_false/3, Activity3, State2); + try + %% If we have to stop, do it asap so we avoid any ack confirmation + %% Membership must be checked again by erase_members_in_group, as the + %% node can be marked as dead on the meanwhile + check_membership(GroupName), + {MembersState1, {Confirms1, Activity1}} = + calculate_activity(MembersState, Confirms, Activity, Self, View), + State1 = State #state { members_state = MembersState1, + confirms = Confirms1 }, + Activity3 = activity_finalise(Activity1), + ok = maybe_send_activity(Activity3, State1), + {Result, State2} = maybe_erase_aliases(State1, View), + if_callback_success( + Result, fun activity_true/3, fun activity_false/3, Activity3, State2) + catch + lost_membership -> + {{stop, normal}, State} + end; handle_msg({activity, _NotLeft, _Activity}, State) -> {ok, State}. @@ -1091,8 +1089,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> fun () -> join_group( Self, GroupName, - record_dead_member_in_group( - Left, GroupName, TxnFun), + record_dead_member_in_group(Self, + Left, GroupName, TxnFun, false), TxnFun) end, try @@ -1142,47 +1140,84 @@ prune_or_create_group(Self, GroupName, TxnFun) -> end end). -record_dead_member_in_group(Member, GroupName, TxnFun) -> - TxnFun( - fun () -> - Group = #gm_group { members = Members, version = Ver } = - read_group(GroupName), - case lists:splitwith( - fun (Member1) -> Member1 =/= Member end, Members) of - {_Members1, []} -> %% not found - already recorded dead - Group; - {Members1, [Member | Members2]} -> - Members3 = Members1 ++ [{dead, Member} | Members2], - write_group(Group #gm_group { members = Members3, - version = Ver + 1 }) - end - end). +record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) -> + Fun = + fun () -> + try + Group = #gm_group { members = Members, version = Ver } = + case Verify of + true -> + check_membership(Self, read_group(GroupName)); + false -> + read_group(GroupName) + end, + case lists:splitwith( + fun (Member1) -> Member1 =/= Member end, Members) of + {_Members1, []} -> %% not found - already recorded dead + Group; + {Members1, [Member | Members2]} -> + Members3 = Members1 ++ [{dead, Member} | Members2], + write_group(Group #gm_group { members = Members3, + version = Ver + 1 }) + end + catch + lost_membership -> + %% The transaction must not be abruptly crashed, but + %% leave the gen_server to stop normally + {error, lost_membership} + end + end, + handle_lost_membership_in_txn(TxnFun, Fun). + +handle_lost_membership_in_txn(TxnFun, Fun) -> + case TxnFun(Fun) of + {error, lost_membership = T} -> + throw(T); + Any -> + Any + end. record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> - TxnFun( - fun () -> - Group = #gm_group { members = Members, version = Ver } = - read_group(GroupName), - {Prefix, [Left | Suffix]} = - lists:splitwith(fun (M) -> M =/= Left end, Members), - write_group(Group #gm_group { - members = Prefix ++ [Left, NewMember | Suffix], - version = Ver + 1 }) - end). + Fun = + fun () -> + try + Group = #gm_group { members = Members, version = Ver } = + check_membership(Left, read_group(GroupName)), + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + write_group(Group #gm_group { + members = Prefix ++ [Left, NewMember | Suffix], + version = Ver + 1 }) + catch + lost_membership -> + %% The transaction must not be abruptly crashed, but + %% leave the gen_server to stop normally + {error, lost_membership} + end + end, + handle_lost_membership_in_txn(TxnFun, Fun). -erase_members_in_group(Members, GroupName, TxnFun) -> +erase_members_in_group(Self, Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], - TxnFun( - fun () -> - Group = #gm_group { members = [_|_] = Members1, version = Ver } = - read_group(GroupName), - case Members1 -- DeadMembers of - Members1 -> Group; - Members2 -> write_group( - Group #gm_group { members = Members2, - version = Ver + 1 }) + Fun = + fun () -> + try + Group = #gm_group { members = [_|_] = Members1, version = Ver } = + check_membership(Self, read_group(GroupName)), + case Members1 -- DeadMembers of + Members1 -> Group; + Members2 -> write_group( + Group #gm_group { members = Members2, + version = Ver + 1 }) + end + catch + lost_membership -> + %% The transaction must not be abruptly crashed, but + %% leave the gen_server to stop normally + {error, lost_membership} end - end). + end, + handle_lost_membership_in_txn(TxnFun, Fun). maybe_erase_aliases(State = #state { self = Self, group_name = GroupName, @@ -1203,7 +1238,7 @@ maybe_erase_aliases(State = #state { self = Self, View1 = case Erasable of [] -> View; _ -> group_to_view( - erase_members_in_group(Erasable, GroupName, TxnFun)) + erase_members_in_group(Self, Erasable, GroupName, TxnFun)) end, change_view(View1, State #state { members_state = MembersState1 }). @@ -1378,6 +1413,41 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, Msg) -> ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}). +calculate_activity(MembersState, Confirms, Activity, Self, View) -> + lists:foldl( + fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> + with_member_acc( + fun (Member = #member { pending_ack = PA, + last_pub = LP, + last_ack = LA }, + {Confirms2, Activity2}) -> + case is_member_alias(Id, Self, View) of + true -> + {ToAck, PA1} = + find_common(queue_from_pubs(Pubs), PA, + queue:new()), + LA1 = last_ack(Acks, LA), + AckNums = acks_from_queue(ToAck), + Confirms3 = maybe_confirm( + Self, Id, Confirms2, AckNums), + {Member #member { pending_ack = PA1, + last_ack = LA1 }, + {Confirms3, + activity_cons( + Id, [], AckNums, Activity2)}}; + false -> + PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), + LA1 = last_ack(Acks, LA), + LP1 = last_pub(Pubs, LP), + {Member #member { pending_ack = PA1, + last_pub = LP1, + last_ack = LA1 }, + {Confirms2, + activity_cons(Id, Pubs, Acks, Activity2)}} + end + end, Id, MembersStateConfirmsActivity) + end, {MembersState, {Confirms, activity_nil()}}, Activity). + callback(Args, Module, Activity) -> Result = lists:foldl( @@ -1530,3 +1600,24 @@ call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). monitor(Pid) -> erlang:monitor(process, Pid). demonitor(MRef) -> erlang:demonitor(MRef). + +check_membership(Self, #gm_group{members = M} = Group) -> + case lists:member(Self, M) of + true -> + Group; + false -> + throw(lost_membership) + end. + +check_membership(GroupName) -> + case dirty_read_group(GroupName) of + #gm_group{members = M} -> + case lists:keymember(self(), 2, M) of + true -> + ok; + false -> + throw(lost_membership) + end; + {error, not_found} -> + throw(lost_membership) + end. diff --git a/src/rabbit.erl b/src/rabbit.erl index 81c7eee580..2cc353d7b8 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -284,16 +284,120 @@ broker_start() -> Plugins = rabbit_plugins:setup(), ToBeLoaded = Plugins ++ ?APPS, start_apps(ToBeLoaded), - case os:type() of - {win32, _} -> ok; - _ -> case code:load_file(sd_notify) of - {module, sd_notify} -> SDNotify = sd_notify, - SDNotify:sd_notify(0, "READY=1"); - {error, _} -> os:cmd("systemd-notify --ready") - end - end, + maybe_sd_notify(), ok = log_broker_started(rabbit_plugins:active()). +%% Try to send systemd ready notification if it makes sense in the +%% current environment. standard_error is used intentionally in all +%% logging statements, so all this messages will end in systemd +%% journal. +maybe_sd_notify() -> + case sd_notify_ready() of + false -> + io:format(standard_error, "systemd READY notification failed, beware of timeouts~n", []); + _ -> + ok + end. + +sd_notify_ready() -> + case {os:type(), os:getenv("NOTIFY_SOCKET")} of + {{win32, _}, _} -> + true; + {_, [_|_]} -> %% Non-empty NOTIFY_SOCKET, give it a try + sd_notify_legacy() orelse sd_notify_socat(); + _ -> + true + end. + +sd_notify_data() -> + "READY=1\nSTATUS=Initialized\nMAINPID=" ++ os:getpid() ++ "\n". + +sd_notify_legacy() -> + case code:load_file(sd_notify) of + {module, sd_notify} -> + SDNotify = sd_notify, + SDNotify:sd_notify(0, sd_notify_data()), + true; + {error, _} -> + false + end. + +%% socat(1) is the most portable way the sd_notify could be +%% implemented in erlang, without introducing some NIF. Currently the +%% following issues prevent us from implementing it in a more +%% reasonable way: +%% - systemd-notify(1) is unstable for non-root users +%% - erlang doesn't support unix domain sockets. +%% +%% Some details on how we ended with such a solution: +%% https://github.com/rabbitmq/rabbitmq-server/issues/664 +sd_notify_socat() -> + case sd_current_unit() of + {ok, Unit} -> + io:format(standard_error, "systemd unit for activation check: \"~s\"~n", [Unit]), + sd_notify_socat(Unit); + _ -> + false + end. + +socat_socket_arg("@" ++ AbstractUnixSocket) -> + "abstract-sendto:" ++ AbstractUnixSocket; +socat_socket_arg(UnixSocket) -> + "unix-sendto:" ++ UnixSocket. + +sd_open_port() -> + open_port( + {spawn_executable, os:find_executable("socat")}, + [{args, [socat_socket_arg(os:getenv("NOTIFY_SOCKET")), "STDIO"]}, + use_stdio, out]). + +sd_notify_socat(Unit) -> + case sd_open_port() of + {'EXIT', Exit} -> + io:format(standard_error, "Failed to start socat ~p~n", [Exit]), + false; + Port -> + Port ! {self(), {command, sd_notify_data()}}, + Result = sd_wait_activation(Port, Unit), + port_close(Port), + Result + end. + +sd_current_unit() -> + case catch re:run(os:cmd("systemctl status " ++ os:getpid()), "([-.@0-9a-zA-Z]+)", [unicode, {capture, all_but_first, list}]) of + {'EXIT', _} -> + error; + {match, [Unit]} -> + {ok, Unit}; + _ -> + error + end. + +sd_wait_activation(Port, Unit) -> + case os:find_executable("systemctl") of + false -> + io:format(standard_error, "'systemctl' unavailable, falling back to sleep~n", []), + timer:sleep(5000), + true; + _ -> + sd_wait_activation(Port, Unit, 10) + end. + +sd_wait_activation(_, _, 0) -> + io:format(standard_error, "Service still in 'activating' state, bailing out~n", []), + false; +sd_wait_activation(Port, Unit, AttemptsLeft) -> + case os:cmd("systemctl show --property=ActiveState " ++ Unit) of + "ActiveState=activating\n" -> + timer:sleep(1000), + sd_wait_activation(Port, Unit, AttemptsLeft - 1); + "ActiveState=" ++ _ -> + true; + _ = Err-> + io:format(standard_error, "Unexpected status from systemd ~p~n", [Err]), + false + end. + start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), case catch register(rabbit_boot, Marker) of @@ -332,6 +436,10 @@ stop_and_halt() -> stop() after rabbit_log:info("Halting Erlang VM~n", []), + %% Also duplicate this information to stderr, so console where + %% foreground broker was running (or systemd journal) will + %% contain information about graceful termination. + io:format(standard_error, "Gracefully halting Erlang VM~n", []), init:stop() end, ok. @@ -693,7 +801,8 @@ print_banner() -> "~n ########## Logs: ~s" "~n ###### ## ~s" "~n ##########" - "~n Starting broker...", + "~n Starting broker..." + "~n", [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE, log_location(kernel), log_location(sasl)]). @@ -722,11 +831,16 @@ log_banner() -> rabbit_log:info("~s", [Banner]). warn_if_kernel_config_dubious() -> - case erlang:system_info(kernel_poll) of - true -> ok; - false -> rabbit_log:warning( - "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " - "and CPU utilization may worsen.~n") + case os:type() of + {win32, _} -> + ok; + _ -> + case erlang:system_info(kernel_poll) of + true -> ok; + false -> rabbit_log:warning( + "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " + "and CPU utilization may worsen.~n") + end end, AsyncThreads = erlang:system_info(thread_pool_size), case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index 6679d9329e..4151504956 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -18,17 +18,19 @@ -include("rabbit_cli.hrl"). -export([main/3, start_distribution/0, start_distribution/1, - parse_arguments/4, rpc_call/4, rpc_call/5, rpc_call/7]). + parse_arguments/4, filter_opts/2, + rpc_call/4, rpc_call/5, rpc_call/7]). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-type(option_name() :: string()). +-type(option_value() :: string() | node() | boolean()). -type(optdef() :: flag | {option, string()}). --type(parse_result() :: {'ok', {atom(), [{string(), string()}], [string()]}} | +-type(parse_result() :: {'ok', {atom(), [{option_name(), option_value()}], [string()]}} | 'no_command'). - -spec(main/3 :: (fun (([string()], string()) -> parse_result()), fun ((atom(), atom(), [any()], [any()]) -> any()), atom()) -> no_return()). @@ -38,6 +40,9 @@ -spec(parse_arguments/4 :: ([{atom(), [{string(), optdef()}]} | atom()], [{string(), optdef()}], string(), [string()]) -> parse_result()). + +-spec(filter_opts/2 :: ([{option_name(), option_value()}], [option_name()]) -> [boolean()]). + -spec(rpc_call/4 :: (node(), atom(), atom(), [any()]) -> any()). -spec(rpc_call/5 :: (node(), atom(), atom(), [any()], number()) -> any()). -spec(rpc_call/7 :: (node(), atom(), atom(), [any()], reference(), pid(), @@ -117,7 +122,10 @@ main(ParseFun, DoFun, UsageMod) -> _ -> print_error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics([Node]), - rabbit_misc:quit(?EX_UNAVAILABLE) + case Command of + stop -> rabbit_misc:quit(?EX_OK); + _ -> rabbit_misc:quit(?EX_UNAVAILABLE) + end end; {badrpc_multi, Reason, Nodes} -> print_error("unable to connect to nodes ~p: ~w", [Nodes, Reason]), @@ -241,6 +249,22 @@ process_opts(Defs, C, [A | As], Found, KVs, Outs) -> {none, _, _} -> no_command end. +%% When we have a set of flags that are used for filtering, we want by +%% default to include every such option in our output. But if a user +%% explicitly specified any such flag, we want to include only items +%% which he has requested. +filter_opts(CurrentOptionValues, AllOptionNames) -> + Explicit = lists:map(fun(OptName) -> + proplists:get_bool(OptName, CurrentOptionValues) + end, + AllOptionNames), + case lists:member(true, Explicit) of + true -> + Explicit; + false -> + lists:duplicate(length(AllOptionNames), true) + end. + %%---------------------------------------------------------------------------- fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index b805d21e48..f63694b657 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -73,7 +73,7 @@ {clear_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, - {list_queues, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, {list_connections, [?VHOST_DEF]}, @@ -508,9 +508,15 @@ action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), Inform(Msg, [Key, Pattern, Defn, PriorityArg]), - rpc_call( + Res = rpc_call( Node, rabbit_policy, parse_set, - [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]); + [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]), + case Res of + {error, Format, Args} when is_list(Format) andalso is_list(Args) -> + {error_string, rabbit_misc:format(Format, Args)}; + _ -> + Res + end; action(clear_policy, Node, [Key], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), @@ -613,10 +619,11 @@ action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) true); action(list_queues, Node, Args, Opts, Inform, Timeout) -> + [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), Inform("Listing queues", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, messages]), - call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]}, + call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, ArgAtoms, Timeout); action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> @@ -753,15 +760,26 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. +display_info_message_row(IsEscaped, Result, InfoItemKeys) -> + display_row([format_info_item( + case proplists:lookup(X, Result) of + none when is_list(Result), length(Result) > 0 -> + exit({error, {bad_info_key, X}}); + none -> Result; + {X, Value} -> Value + end, IsEscaped) || X <- InfoItemKeys]). + display_info_message(IsEscaped) -> - fun(Result, InfoItemKeys) -> - display_row([format_info_item( - case proplists:lookup(X, Result) of - none when is_list(Result), length(Result) > 0 -> - exit({error, {bad_info_key, X}}); - none -> Result; - {X, Value} -> Value - end, IsEscaped) || X <- InfoItemKeys]) + fun ([], _) -> + ok; + ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> + lists:foreach(fun(Result) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys) + end, + List), + ok; + (Result, InfoItemKeys) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys) end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 124306487e..88a8096fd4 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -235,7 +235,7 @@ parse_free_win32(CommandResult) -> list_to_integer(lists:reverse(Free)). interpret_limit({mem_relative, Relative}) - when is_float(Relative), Relative < 1 -> + when is_float(Relative) -> round(Relative * vm_memory_monitor:get_total_memory()); interpret_limit(Absolute) -> case rabbit_resource_monitor_misc:parse_information_unit(Absolute) of diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index c6081fad0d..ed73a293ca 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -96,10 +96,18 @@ with_local_io(Fun) -> Node = node(), case node(GL) of Node -> Fun(); - _ -> group_leader(whereis(user), self()), + _ -> set_group_leader_to_user(), try Fun() after group_leader(GL, self()) end end. + +set_group_leader_to_user() -> + case whereis(user) of + undefined -> + warning("the 'user' I/O process has terminated, some features will fail until Erlang VM is restarted"); + User -> + group_leader(User, self()) + end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 057a4fad31..e447e9de82 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -43,7 +43,8 @@ backing_queue_state, seen_status, confirmed, - known_senders + known_senders, + wait_timeout }). -ifdef(use_specs). @@ -130,7 +131,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> backing_queue_state = BQS, seen_status = dict:new(), confirmed = [], - known_senders = sets:new() }. + known_senders = sets:new(), + wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }. stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, @@ -203,7 +205,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), @@ -215,7 +217,7 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> [receive {'DOWN', MRef, process, _Pid, _Info} -> ok - after 15000 -> + after WT -> rabbit_mirror_queue_misc:log_warning( QName, "Missing 'DOWN' message from ~p in node ~p~n", [Pid, node(Pid)]), diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index d5f7328fec..82effb4fc5 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -45,7 +45,7 @@ memory() -> Mnesia = mnesia_memory(), MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]), - MgmtDbETS = ets_memory([rabbit_mgmt_db]), + MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]), [{total, Total}, {processes, Processes}, |
