summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-04-20 09:03:52 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-04-20 09:03:52 +0300
commiteb1f6e5cdd8e53164135c10dc79f9a9496d53755 (patch)
treee0e429f8adbd81e378c776547c462669eee14fcc /src
parent628d803b95a6aa93e82e6b6e53dce70a7513a522 (diff)
parent57c4a7af72268b8cf88efd2ec774616dd14aa31f (diff)
downloadrabbitmq-server-git-eb1f6e5cdd8e53164135c10dc79f9a9496d53755.tar.gz
Merge branch 'stable' into rabbitmq-server-343bis
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl351
-rw-r--r--src/rabbit.erl142
-rw-r--r--src/rabbit_cli.erl32
-rw-r--r--src/rabbit_control_main.erl42
-rw-r--r--src/rabbit_disk_monitor.erl2
-rw-r--r--src/rabbit_log.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl10
-rw-r--r--src/rabbit_vm.erl2
8 files changed, 424 insertions, 167 deletions
diff --git a/src/gm.erl b/src/gm.erl
index aeb050e15f..199cf7c4de 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -617,14 +617,20 @@ handle_call({add_on_right, NewMember}, _From,
group_name = GroupName,
members_state = MembersState,
txn_executor = TxnFun }) ->
- Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
- View1 = group_to_view(Group),
- MembersState1 = remove_erased_members(MembersState, View1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(MembersState1)}),
- {Result, State1} = change_view(View1, State #state {
- members_state = MembersState1 }),
- handle_callback_result({Result, {ok, Group}, State1}).
+ try
+ Group = record_new_member_in_group(
+ NewMember, Self, GroupName, TxnFun),
+ View1 = group_to_view(check_membership(Self, Group)),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self, prepare_members_state(MembersState1)}),
+ {Result, State1} = change_view(View1, State #state {
+ members_state = MembersState1 }),
+ handle_callback_result({Result, {ok, Group}, State1})
+ catch
+ lost_membership ->
+ {stop, normal, State}
+ end.
%% add_on_right causes a catchup to be sent immediately from the left,
%% so we can never see this from the left neighbour. However, it's
@@ -638,19 +644,28 @@ handle_cast({?TAG, _ReqVer, check_neighbours},
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
+ self = Self,
members_state = MembersState,
group_name = GroupName }) ->
- {Result, State1} =
- case needs_view_update(ReqVer, View) of
- true -> View1 = group_to_view(dirty_read_group(GroupName)),
- MemberState1 = remove_erased_members(MembersState, View1),
- change_view(View1, State #state {
- members_state = MemberState1 });
- false -> {ok, State}
- end,
- handle_callback_result(
- if_callback_success(
- Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+ try
+ {Result, State1} =
+ case needs_view_update(ReqVer, View) of
+ true ->
+ View1 = group_to_view(
+ check_membership(Self,
+ dirty_read_group(GroupName))),
+ MemberState1 = remove_erased_members(MembersState, View1),
+ change_view(View1, State #state {
+ members_state = MemberState1 });
+ false -> {ok, State}
+ end,
+ handle_callback_result(
+ if_callback_success(
+ Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
+ catch
+ lost_membership ->
+ {stop, normal, State}
+ end;
handle_cast({broadcast, _Msg, _SizeHint},
State = #state { shutting_down = {true, _} }) ->
@@ -724,39 +739,44 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
group_name = GroupName,
confirms = Confirms,
txn_executor = TxnFun }) ->
- Member = case {Left, Right} of
- {{Member1, MRef}, _} -> Member1;
- {_, {Member1, MRef}} -> Member1;
- _ -> undefined
- end,
- case {Member, Reason} of
- {undefined, _} ->
- noreply(State);
- {_, {shutdown, ring_shutdown}} ->
- noreply(State);
- _ ->
- %% In the event of a partial partition we could see another member
- %% go down and then remove them from Mnesia. While they can
- %% recover from this they'd have to restart the queue - not
- %% ideal. So let's sleep here briefly just in case this was caused
- %% by a partial partition; in which case by the time we record the
- %% member death in Mnesia we will probably be in a full
- %% partition and will not be assassinating another member.
- timer:sleep(100),
- View1 = group_to_view(record_dead_member_in_group(
- Member, GroupName, TxnFun)),
- handle_callback_result(
- case alive_view_members(View1) of
- [Self] -> maybe_erase_aliases(
- State #state {
- members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) },
- View1);
- _ -> change_view(View1, State)
- end)
+ try
+ check_membership(GroupName),
+ Member = case {Left, Right} of
+ {{Member1, MRef}, _} -> Member1;
+ {_, {Member1, MRef}} -> Member1;
+ _ -> undefined
+ end,
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
+ noreply(State);
+ _ ->
+ %% In the event of a partial partition we could see another member
+ %% go down and then remove them from Mnesia. While they can
+ %% recover from this they'd have to restart the queue - not
+ %% ideal. So let's sleep here briefly just in case this was caused
+ %% by a partial partition; in which case by the time we record the
+ %% member death in Mnesia we will probably be in a full
+ %% partition and will not be assassinating another member.
+ timer:sleep(100),
+ View1 = group_to_view(record_dead_member_in_group(Self,
+ Member, GroupName, TxnFun, true)),
+ handle_callback_result(
+ case alive_view_members(View1) of
+ [Self] -> maybe_erase_aliases(
+ State #state {
+ members_state = blank_member_state(),
+ confirms = purge_confirms(Confirms) },
+ View1);
+ _ -> change_view(View1, State)
+ end)
+ end
+ catch
+ lost_membership ->
+ {stop, normal, State}
end.
-
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
@@ -841,52 +861,30 @@ handle_msg({catchup, _NotLeft, _MembersState}, State) ->
handle_msg({activity, Left, Activity},
State = #state { self = Self,
+ group_name = GroupName,
left = {Left, _MRefL},
view = View,
members_state = MembersState,
confirms = Confirms })
when MembersState =/= undefined ->
- {MembersState1, {Confirms1, Activity1}} =
- lists:foldl(
- fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
- with_member_acc(
- fun (Member = #member { pending_ack = PA,
- last_pub = LP,
- last_ack = LA },
- {Confirms2, Activity2}) ->
- case is_member_alias(Id, Self, View) of
- true ->
- {ToAck, PA1} =
- find_common(queue_from_pubs(Pubs), PA,
- queue:new()),
- LA1 = last_ack(Acks, LA),
- AckNums = acks_from_queue(ToAck),
- Confirms3 = maybe_confirm(
- Self, Id, Confirms2, AckNums),
- {Member #member { pending_ack = PA1,
- last_ack = LA1 },
- {Confirms3,
- activity_cons(
- Id, [], AckNums, Activity2)}};
- false ->
- PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
- LA1 = last_ack(Acks, LA),
- LP1 = last_pub(Pubs, LP),
- {Member #member { pending_ack = PA1,
- last_pub = LP1,
- last_ack = LA1 },
- {Confirms2,
- activity_cons(Id, Pubs, Acks, Activity2)}}
- end
- end, Id, MembersStateConfirmsActivity)
- end, {MembersState, {Confirms, activity_nil()}}, Activity),
- State1 = State #state { members_state = MembersState1,
- confirms = Confirms1 },
- Activity3 = activity_finalise(Activity1),
- ok = maybe_send_activity(Activity3, State1),
- {Result, State2} = maybe_erase_aliases(State1, View),
- if_callback_success(
- Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
+ try
+ %% If we have to stop, do it asap so we avoid any ack confirmation
+ %% Membership must be checked again by erase_members_in_group, as the
+ %% node can be marked as dead on the meanwhile
+ check_membership(GroupName),
+ {MembersState1, {Confirms1, Activity1}} =
+ calculate_activity(MembersState, Confirms, Activity, Self, View),
+ State1 = State #state { members_state = MembersState1,
+ confirms = Confirms1 },
+ Activity3 = activity_finalise(Activity1),
+ ok = maybe_send_activity(Activity3, State1),
+ {Result, State2} = maybe_erase_aliases(State1, View),
+ if_callback_success(
+ Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
+ catch
+ lost_membership ->
+ {{stop, normal}, State}
+ end;
handle_msg({activity, _NotLeft, _Activity}, State) ->
{ok, State}.
@@ -1091,8 +1089,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(
- Left, GroupName, TxnFun),
+ record_dead_member_in_group(Self,
+ Left, GroupName, TxnFun, false),
TxnFun)
end,
try
@@ -1142,47 +1140,84 @@ prune_or_create_group(Self, GroupName, TxnFun) ->
end
end).
-record_dead_member_in_group(Member, GroupName, TxnFun) ->
- TxnFun(
- fun () ->
- Group = #gm_group { members = Members, version = Ver } =
- read_group(GroupName),
- case lists:splitwith(
- fun (Member1) -> Member1 =/= Member end, Members) of
- {_Members1, []} -> %% not found - already recorded dead
- Group;
- {Members1, [Member | Members2]} ->
- Members3 = Members1 ++ [{dead, Member} | Members2],
- write_group(Group #gm_group { members = Members3,
- version = Ver + 1 })
- end
- end).
+record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
+ Fun =
+ fun () ->
+ try
+ Group = #gm_group { members = Members, version = Ver } =
+ case Verify of
+ true ->
+ check_membership(Self, read_group(GroupName));
+ false ->
+ read_group(GroupName)
+ end,
+ case lists:splitwith(
+ fun (Member1) -> Member1 =/= Member end, Members) of
+ {_Members1, []} -> %% not found - already recorded dead
+ Group;
+ {Members1, [Member | Members2]} ->
+ Members3 = Members1 ++ [{dead, Member} | Members2],
+ write_group(Group #gm_group { members = Members3,
+ version = Ver + 1 })
+ end
+ catch
+ lost_membership ->
+ %% The transaction must not be abruptly crashed, but
+ %% leave the gen_server to stop normally
+ {error, lost_membership}
+ end
+ end,
+ handle_lost_membership_in_txn(TxnFun, Fun).
+
+handle_lost_membership_in_txn(TxnFun, Fun) ->
+ case TxnFun(Fun) of
+ {error, lost_membership = T} ->
+ throw(T);
+ Any ->
+ Any
+ end.
record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
- TxnFun(
- fun () ->
- Group = #gm_group { members = Members, version = Ver } =
- read_group(GroupName),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- write_group(Group #gm_group {
- members = Prefix ++ [Left, NewMember | Suffix],
- version = Ver + 1 })
- end).
+ Fun =
+ fun () ->
+ try
+ Group = #gm_group { members = Members, version = Ver } =
+ check_membership(Left, read_group(GroupName)),
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ catch
+ lost_membership ->
+ %% The transaction must not be abruptly crashed, but
+ %% leave the gen_server to stop normally
+ {error, lost_membership}
+ end
+ end,
+ handle_lost_membership_in_txn(TxnFun, Fun).
-erase_members_in_group(Members, GroupName, TxnFun) ->
+erase_members_in_group(Self, Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- TxnFun(
- fun () ->
- Group = #gm_group { members = [_|_] = Members1, version = Ver } =
- read_group(GroupName),
- case Members1 -- DeadMembers of
- Members1 -> Group;
- Members2 -> write_group(
- Group #gm_group { members = Members2,
- version = Ver + 1 })
+ Fun =
+ fun () ->
+ try
+ Group = #gm_group { members = [_|_] = Members1, version = Ver } =
+ check_membership(Self, read_group(GroupName)),
+ case Members1 -- DeadMembers of
+ Members1 -> Group;
+ Members2 -> write_group(
+ Group #gm_group { members = Members2,
+ version = Ver + 1 })
+ end
+ catch
+ lost_membership ->
+ %% The transaction must not be abruptly crashed, but
+ %% leave the gen_server to stop normally
+ {error, lost_membership}
end
- end).
+ end,
+ handle_lost_membership_in_txn(TxnFun, Fun).
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
@@ -1203,7 +1238,7 @@ maybe_erase_aliases(State = #state { self = Self,
View1 = case Erasable of
[] -> View;
_ -> group_to_view(
- erase_members_in_group(Erasable, GroupName, TxnFun))
+ erase_members_in_group(Self, Erasable, GroupName, TxnFun))
end,
change_view(View1, State #state { members_state = MembersState1 }).
@@ -1378,6 +1413,41 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, Msg) ->
ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}).
+calculate_activity(MembersState, Confirms, Activity, Self, View) ->
+ lists:foldl(
+ fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
+ with_member_acc(
+ fun (Member = #member { pending_ack = PA,
+ last_pub = LP,
+ last_ack = LA },
+ {Confirms2, Activity2}) ->
+ case is_member_alias(Id, Self, View) of
+ true ->
+ {ToAck, PA1} =
+ find_common(queue_from_pubs(Pubs), PA,
+ queue:new()),
+ LA1 = last_ack(Acks, LA),
+ AckNums = acks_from_queue(ToAck),
+ Confirms3 = maybe_confirm(
+ Self, Id, Confirms2, AckNums),
+ {Member #member { pending_ack = PA1,
+ last_ack = LA1 },
+ {Confirms3,
+ activity_cons(
+ Id, [], AckNums, Activity2)}};
+ false ->
+ PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
+ LA1 = last_ack(Acks, LA),
+ LP1 = last_pub(Pubs, LP),
+ {Member #member { pending_ack = PA1,
+ last_pub = LP1,
+ last_ack = LA1 },
+ {Confirms2,
+ activity_cons(Id, Pubs, Acks, Activity2)}}
+ end
+ end, Id, MembersStateConfirmsActivity)
+ end, {MembersState, {Confirms, activity_nil()}}, Activity).
+
callback(Args, Module, Activity) ->
Result =
lists:foldl(
@@ -1530,3 +1600,24 @@ call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
monitor(Pid) -> erlang:monitor(process, Pid).
demonitor(MRef) -> erlang:demonitor(MRef).
+
+check_membership(Self, #gm_group{members = M} = Group) ->
+ case lists:member(Self, M) of
+ true ->
+ Group;
+ false ->
+ throw(lost_membership)
+ end.
+
+check_membership(GroupName) ->
+ case dirty_read_group(GroupName) of
+ #gm_group{members = M} ->
+ case lists:keymember(self(), 2, M) of
+ true ->
+ ok;
+ false ->
+ throw(lost_membership)
+ end;
+ {error, not_found} ->
+ throw(lost_membership)
+ end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 81c7eee580..2cc353d7b8 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -284,16 +284,120 @@ broker_start() ->
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
start_apps(ToBeLoaded),
- case os:type() of
- {win32, _} -> ok;
- _ -> case code:load_file(sd_notify) of
- {module, sd_notify} -> SDNotify = sd_notify,
- SDNotify:sd_notify(0, "READY=1");
- {error, _} -> os:cmd("systemd-notify --ready")
- end
- end,
+ maybe_sd_notify(),
ok = log_broker_started(rabbit_plugins:active()).
+%% Try to send systemd ready notification if it makes sense in the
+%% current environment. standard_error is used intentionally in all
+%% logging statements, so all this messages will end in systemd
+%% journal.
+maybe_sd_notify() ->
+ case sd_notify_ready() of
+ false ->
+ io:format(standard_error, "systemd READY notification failed, beware of timeouts~n", []);
+ _ ->
+ ok
+ end.
+
+sd_notify_ready() ->
+ case {os:type(), os:getenv("NOTIFY_SOCKET")} of
+ {{win32, _}, _} ->
+ true;
+ {_, [_|_]} -> %% Non-empty NOTIFY_SOCKET, give it a try
+ sd_notify_legacy() orelse sd_notify_socat();
+ _ ->
+ true
+ end.
+
+sd_notify_data() ->
+ "READY=1\nSTATUS=Initialized\nMAINPID=" ++ os:getpid() ++ "\n".
+
+sd_notify_legacy() ->
+ case code:load_file(sd_notify) of
+ {module, sd_notify} ->
+ SDNotify = sd_notify,
+ SDNotify:sd_notify(0, sd_notify_data()),
+ true;
+ {error, _} ->
+ false
+ end.
+
+%% socat(1) is the most portable way the sd_notify could be
+%% implemented in erlang, without introducing some NIF. Currently the
+%% following issues prevent us from implementing it in a more
+%% reasonable way:
+%% - systemd-notify(1) is unstable for non-root users
+%% - erlang doesn't support unix domain sockets.
+%%
+%% Some details on how we ended with such a solution:
+%% https://github.com/rabbitmq/rabbitmq-server/issues/664
+sd_notify_socat() ->
+ case sd_current_unit() of
+ {ok, Unit} ->
+ io:format(standard_error, "systemd unit for activation check: \"~s\"~n", [Unit]),
+ sd_notify_socat(Unit);
+ _ ->
+ false
+ end.
+
+socat_socket_arg("@" ++ AbstractUnixSocket) ->
+ "abstract-sendto:" ++ AbstractUnixSocket;
+socat_socket_arg(UnixSocket) ->
+ "unix-sendto:" ++ UnixSocket.
+
+sd_open_port() ->
+ open_port(
+ {spawn_executable, os:find_executable("socat")},
+ [{args, [socat_socket_arg(os:getenv("NOTIFY_SOCKET")), "STDIO"]},
+ use_stdio, out]).
+
+sd_notify_socat(Unit) ->
+ case sd_open_port() of
+ {'EXIT', Exit} ->
+ io:format(standard_error, "Failed to start socat ~p~n", [Exit]),
+ false;
+ Port ->
+ Port ! {self(), {command, sd_notify_data()}},
+ Result = sd_wait_activation(Port, Unit),
+ port_close(Port),
+ Result
+ end.
+
+sd_current_unit() ->
+ case catch re:run(os:cmd("systemctl status " ++ os:getpid()), "([-.@0-9a-zA-Z]+)", [unicode, {capture, all_but_first, list}]) of
+ {'EXIT', _} ->
+ error;
+ {match, [Unit]} ->
+ {ok, Unit};
+ _ ->
+ error
+ end.
+
+sd_wait_activation(Port, Unit) ->
+ case os:find_executable("systemctl") of
+ false ->
+ io:format(standard_error, "'systemctl' unavailable, falling back to sleep~n", []),
+ timer:sleep(5000),
+ true;
+ _ ->
+ sd_wait_activation(Port, Unit, 10)
+ end.
+
+sd_wait_activation(_, _, 0) ->
+ io:format(standard_error, "Service still in 'activating' state, bailing out~n", []),
+ false;
+sd_wait_activation(Port, Unit, AttemptsLeft) ->
+ case os:cmd("systemctl show --property=ActiveState " ++ Unit) of
+ "ActiveState=activating\n" ->
+ timer:sleep(1000),
+ sd_wait_activation(Port, Unit, AttemptsLeft - 1);
+ "ActiveState=" ++ _ ->
+ true;
+ _ = Err->
+ io:format(standard_error, "Unexpected status from systemd ~p~n", [Err]),
+ false
+ end.
+
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
case catch register(rabbit_boot, Marker) of
@@ -332,6 +436,10 @@ stop_and_halt() ->
stop()
after
rabbit_log:info("Halting Erlang VM~n", []),
+ %% Also duplicate this information to stderr, so console where
+ %% foreground broker was running (or systemd journal) will
+ %% contain information about graceful termination.
+ io:format(standard_error, "Gracefully halting Erlang VM~n", []),
init:stop()
end,
ok.
@@ -693,7 +801,8 @@ print_banner() ->
"~n ########## Logs: ~s"
"~n ###### ## ~s"
"~n ##########"
- "~n Starting broker...",
+ "~n Starting broker..."
+ "~n",
[Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
log_location(kernel), log_location(sasl)]).
@@ -722,11 +831,16 @@ log_banner() ->
rabbit_log:info("~s", [Banner]).
warn_if_kernel_config_dubious() ->
- case erlang:system_info(kernel_poll) of
- true -> ok;
- false -> rabbit_log:warning(
- "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
- "and CPU utilization may worsen.~n")
+ case os:type() of
+ {win32, _} ->
+ ok;
+ _ ->
+ case erlang:system_info(kernel_poll) of
+ true -> ok;
+ false -> rabbit_log:warning(
+ "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
+ "and CPU utilization may worsen.~n")
+ end
end,
AsyncThreads = erlang:system_info(thread_pool_size),
case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 6679d9329e..4151504956 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -18,17 +18,19 @@
-include("rabbit_cli.hrl").
-export([main/3, start_distribution/0, start_distribution/1,
- parse_arguments/4, rpc_call/4, rpc_call/5, rpc_call/7]).
+ parse_arguments/4, filter_opts/2,
+ rpc_call/4, rpc_call/5, rpc_call/7]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-type(option_name() :: string()).
+-type(option_value() :: string() | node() | boolean()).
-type(optdef() :: flag | {option, string()}).
--type(parse_result() :: {'ok', {atom(), [{string(), string()}], [string()]}} |
+-type(parse_result() :: {'ok', {atom(), [{option_name(), option_value()}], [string()]}} |
'no_command').
-
-spec(main/3 :: (fun (([string()], string()) -> parse_result()),
fun ((atom(), atom(), [any()], [any()]) -> any()),
atom()) -> no_return()).
@@ -38,6 +40,9 @@
-spec(parse_arguments/4 ::
([{atom(), [{string(), optdef()}]} | atom()],
[{string(), optdef()}], string(), [string()]) -> parse_result()).
+
+-spec(filter_opts/2 :: ([{option_name(), option_value()}], [option_name()]) -> [boolean()]).
+
-spec(rpc_call/4 :: (node(), atom(), atom(), [any()]) -> any()).
-spec(rpc_call/5 :: (node(), atom(), atom(), [any()], number()) -> any()).
-spec(rpc_call/7 :: (node(), atom(), atom(), [any()], reference(), pid(),
@@ -117,7 +122,10 @@ main(ParseFun, DoFun, UsageMod) ->
_ ->
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics([Node]),
- rabbit_misc:quit(?EX_UNAVAILABLE)
+ case Command of
+ stop -> rabbit_misc:quit(?EX_OK);
+ _ -> rabbit_misc:quit(?EX_UNAVAILABLE)
+ end
end;
{badrpc_multi, Reason, Nodes} ->
print_error("unable to connect to nodes ~p: ~w", [Nodes, Reason]),
@@ -241,6 +249,22 @@ process_opts(Defs, C, [A | As], Found, KVs, Outs) ->
{none, _, _} -> no_command
end.
+%% When we have a set of flags that are used for filtering, we want by
+%% default to include every such option in our output. But if a user
+%% explicitly specified any such flag, we want to include only items
+%% which he has requested.
+filter_opts(CurrentOptionValues, AllOptionNames) ->
+ Explicit = lists:map(fun(OptName) ->
+ proplists:get_bool(OptName, CurrentOptionValues)
+ end,
+ AllOptionNames),
+ case lists:member(true, Explicit) of
+ true ->
+ Explicit;
+ false ->
+ lists:duplicate(length(AllOptionNames), true)
+ end.
+
%%----------------------------------------------------------------------------
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index b805d21e48..f63694b657 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -73,7 +73,7 @@
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
- {list_queues, [?VHOST_DEF]},
+ {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
{list_connections, [?VHOST_DEF]},
@@ -508,9 +508,15 @@ action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
- rpc_call(
+ Res = rpc_call(
Node, rabbit_policy, parse_set,
- [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]);
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]),
+ case Res of
+ {error, Format, Args} when is_list(Format) andalso is_list(Args) ->
+ {error_string, rabbit_misc:format(Format, Args)};
+ _ ->
+ Res
+ end;
action(clear_policy, Node, [Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
@@ -613,10 +619,11 @@ action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout)
true);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
+ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
Inform("Listing queues", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),
- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]},
+ call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
ArgAtoms, Timeout);
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
@@ -753,15 +760,26 @@ default_if_empty(List, Default) when is_list(List) ->
true -> [list_to_atom(X) || X <- List]
end.
+display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
+ display_row([format_info_item(
+ case proplists:lookup(X, Result) of
+ none when is_list(Result), length(Result) > 0 ->
+ exit({error, {bad_info_key, X}});
+ none -> Result;
+ {X, Value} -> Value
+ end, IsEscaped) || X <- InfoItemKeys]).
+
display_info_message(IsEscaped) ->
- fun(Result, InfoItemKeys) ->
- display_row([format_info_item(
- case proplists:lookup(X, Result) of
- none when is_list(Result), length(Result) > 0 ->
- exit({error, {bad_info_key, X}});
- none -> Result;
- {X, Value} -> Value
- end, IsEscaped) || X <- InfoItemKeys])
+ fun ([], _) ->
+ ok;
+ ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
+ lists:foreach(fun(Result) ->
+ display_info_message_row(IsEscaped, Result, InfoItemKeys)
+ end,
+ List),
+ ok;
+ (Result, InfoItemKeys) ->
+ display_info_message_row(IsEscaped, Result, InfoItemKeys)
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 124306487e..88a8096fd4 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -235,7 +235,7 @@ parse_free_win32(CommandResult) ->
list_to_integer(lists:reverse(Free)).
interpret_limit({mem_relative, Relative})
- when is_float(Relative), Relative < 1 ->
+ when is_float(Relative) ->
round(Relative * vm_memory_monitor:get_total_memory());
interpret_limit(Absolute) ->
case rabbit_resource_monitor_misc:parse_information_unit(Absolute) of
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index c6081fad0d..ed73a293ca 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -96,10 +96,18 @@ with_local_io(Fun) ->
Node = node(),
case node(GL) of
Node -> Fun();
- _ -> group_leader(whereis(user), self()),
+ _ -> set_group_leader_to_user(),
try
Fun()
after
group_leader(GL, self())
end
end.
+
+set_group_leader_to_user() ->
+ case whereis(user) of
+ undefined ->
+ warning("the 'user' I/O process has terminated, some features will fail until Erlang VM is restarted");
+ User ->
+ group_leader(User, self())
+ end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 057a4fad31..e447e9de82 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -43,7 +43,8 @@
backing_queue_state,
seen_status,
confirmed,
- known_senders
+ known_senders,
+ wait_timeout
}).
-ifdef(use_specs).
@@ -130,7 +131,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
backing_queue_state = BQS,
seen_status = dict:new(),
confirmed = [],
- known_senders = sets:new() }.
+ known_senders = sets:new(),
+ wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }.
stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
@@ -203,7 +205,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
@@ -215,7 +217,7 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
[receive
{'DOWN', MRef, process, _Pid, _Info} ->
ok
- after 15000 ->
+ after WT ->
rabbit_mirror_queue_misc:log_warning(
QName, "Missing 'DOWN' message from ~p in node ~p~n",
[Pid, node(Pid)]),
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index d5f7328fec..82effb4fc5 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -45,7 +45,7 @@ memory() ->
Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]),
- MgmtDbETS = ets_memory([rabbit_mgmt_db]),
+ MgmtDbETS = ets_memory([rabbit_mgmt_event_collector]),
[{total, Total},
{processes, Processes},