summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-04-12 14:30:07 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-04-12 14:30:07 +0300
commitf22376ee9da053f1a739103a7a5392781ba6e312 (patch)
tree11261a09cc30dbdb4320c0cc22cf8cdc74fbcfc5 /src
parent8ce0f08268df289ce9bf8d7d4096d3db4816fcf0 (diff)
parentc4519ddf4baf8af772b8b1a79945eef02cda17f4 (diff)
downloadrabbitmq-server-git-f22376ee9da053f1a739103a7a5392781ba6e312.tar.gz
Merge branch 'master' into rabbitmq-server-1143
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl25
-rw-r--r--src/rabbit_disk_monitor.erl45
-rw-r--r--src/rabbit_msg_store.erl57
3 files changed, 77 insertions, 50 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 4e24d1b544..c52949af4c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -485,16 +485,21 @@ stop_and_halt() ->
rabbit_log:error("Error trying to stop RabbitMQ: ~p:~p", [Type, Reason]),
error({Type, Reason})
after
- AppsLeft = [ A || {A, _, _} <- application:which_applications() ],
- rabbit_log:info(
- lists:flatten(["Halting Erlang VM with the following applications:~n",
- [" ~p~n" || _ <- AppsLeft]]),
- AppsLeft),
- %% Also duplicate this information to stderr, so console where
- %% foreground broker was running (or systemd journal) will
- %% contain information about graceful termination.
- io:format(standard_error, "Gracefully halting Erlang VM~n", []),
- init:stop()
+ %% Enclose all the logging in the try block.
+ %% init:stop() will be called regardless of any errors.
+ try
+ AppsLeft = [ A || {A, _, _} <- application:which_applications() ],
+ rabbit_log:info(
+ lists:flatten(["Halting Erlang VM with the following applications:~n",
+ [" ~p~n" || _ <- AppsLeft]]),
+ AppsLeft),
+ %% Also duplicate this information to stderr, so console where
+ %% foreground broker was running (or systemd journal) will
+ %% contain information about graceful termination.
+ io:format(standard_error, "Gracefully halting Erlang VM~n", [])
+ after
+ init:stop()
+ end
end,
ok.
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b2548cb61a..629228a1be 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -65,7 +65,12 @@
alarmed,
%% is monitoring enabled? false on unsupported
%% platforms
- enabled
+ enabled,
+ %% number of retries to enable monitoring if it fails
+ %% on start-up
+ retries,
+ %% Interval between retries
+ interval
}).
%%----------------------------------------------------------------------------
@@ -114,20 +119,17 @@ start_link(Args) ->
init([Limit]) ->
Dir = dir(),
+ {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
+ {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
State = #state{dir = Dir,
min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
alarmed = false,
- enabled = true},
- case {catch get_disk_free(Dir),
- vm_memory_monitor:get_total_memory()} of
- {N1, N2} when is_integer(N1), is_integer(N2) ->
- {ok, start_timer(set_disk_limits(State, Limit))};
- Err ->
- rabbit_log:info("Disabling disk free space monitoring "
- "on unsupported platform:~n~p~n", [Err]),
- {ok, State#state{enabled = false}}
- end.
+ enabled = true,
+ limit = Limit,
+ retries = Retries,
+ interval = Interval},
+ {ok, enable(State)}.
handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
{reply, Limit, State};
@@ -161,6 +163,8 @@ handle_call(_Request, _From, State) ->
handle_cast(_Request, State) ->
{noreply, State}.
+handle_info(try_enable, #state{retries = Retries} = State) ->
+ {noreply, enable(State#state{retries = Retries - 1})};
handle_info(update, State) ->
{noreply, start_timer(internal_update(State))};
@@ -246,7 +250,7 @@ interpret_limit(Absolute) ->
emit_update_info(StateStr, CurrentFree, Limit) ->
rabbit_log:info(
- "Disk free space ~s. Free bytes:~p Limit:~p~n",
+ "Free disk space is ~s. Free bytes: ~p. Limit: ~p~n",
[StateStr, CurrentFree, Limit]).
start_timer(State) ->
@@ -261,3 +265,20 @@ interval(#state{limit = Limit,
max_interval = MaxInterval}) ->
IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE,
trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).
+
+enable(#state{retries = 0} = State) ->
+ State;
+enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries}
+ = State) ->
+ case {catch get_disk_free(Dir),
+ vm_memory_monitor:get_total_memory()} of
+ {N1, N2} when is_integer(N1), is_integer(N2) ->
+ rabbit_log:info("Enabling free disk space monitoring~n", []),
+ start_timer(set_disk_limits(State, Limit));
+ Err ->
+ rabbit_log:info("Free disk space monitor encountered an error "
+ "(e.g. failed to parse output from OS tools): ~p, retries left: ~s~n",
+ [Err, Retries]),
+ timer:send_after(Interval, self(), try_enable),
+ State#state{enabled = false}
+ end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 809ec4fda3..891cdf0236 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -155,7 +155,7 @@
-type client_msstate() :: #client_msstate {
server :: server(),
client_ref :: client_ref(),
- file_handle_cache :: dict:dict(),
+ file_handle_cache :: map(),
index_state :: any(),
index_module :: atom(),
dir :: file:filename(),
@@ -495,7 +495,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom
?CREDIT_DISC_BOUND),
#client_msstate { server = Server,
client_ref = Ref,
- file_handle_cache = dict:new(),
+ file_handle_cache = #{},
index_state = IState,
index_module = IModule,
dir = Dir,
@@ -703,7 +703,7 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
- State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
+ State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM),
dying_clients = maps:remove(CRef, DyingClients) }.
@@ -740,7 +740,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Name),
- Clients = dict:from_list(
+ Clients = maps:from_list(
[{CRef, {undefined, undefined, undefined}} ||
CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
@@ -776,7 +776,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
index_state = IndexState,
current_file = 0,
current_file_handle = undefined,
- file_handle_cache = dict:new(),
+ file_handle_cache = #{},
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
@@ -790,7 +790,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_msg_ids = dict:new(),
+ cref_to_msg_ids = #{},
credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
@@ -843,7 +843,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
erlang:monitor(process, CPid),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
@@ -874,7 +874,7 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
- State1 = State #msstate { clients = dict:erase(CRef, Clients) },
+ State1 = State #msstate { clients = maps:remove(CRef, Clients) },
noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgId, Flow},
@@ -882,7 +882,7 @@ handle_cast({write, CRef, MsgId, Flow},
clients = Clients,
credit_disc_bound = CreditDiscBound }) ->
case Flow of
- flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ flow -> {CPid, _, _} = maps:get(CRef, Clients),
%% We are going to process a message sent by the
%% rabbit_amqqueue_process. Now we are accessing the
%% msg_store process dictionary.
@@ -1003,7 +1003,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
- case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ case store_recovery_terms([{client_refs, maps:keys(Clients)},
{index_module, IndexModule}], Dir) of
ok ->
rabbit_log:info("Message store for directory '~s' is stopped", [Dir]),
@@ -1035,12 +1035,12 @@ reply(Reply, State) ->
next_state(State = #msstate { sync_timer_ref = undefined,
cref_to_msg_ids = CTM }) ->
- case dict:size(CTM) of
+ case maps:size(CTM) of
0 -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
- case dict:size(CTM) of
+ case maps:size(CTM) of
0 -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -1055,7 +1055,7 @@ stop_sync_timer(State) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
- CGs = dict:fold(fun (CRef, MsgIds, NS) ->
+ CGs = maps:fold(fun (CRef, MsgIds, NS) ->
case gb_sets:is_empty(MsgIds) of
true -> NS;
false -> [{CRef, MsgIds} | NS]
@@ -1327,7 +1327,7 @@ orddict_store(Key, Val, Dict) ->
update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
- case dict:fetch(CRef, Clients) of
+ case maps:get(CRef, Clients) of
{_CPid, undefined, _CloseFDsFun} -> State;
{_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
State #msstate {
@@ -1337,21 +1337,22 @@ update_pending_confirms(Fun, CRef,
record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
fun (_MsgOnDiskFun, CTM) ->
- dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
- gb_sets:singleton(MsgId), CTM)
+ maps:update_with(CRef,
+ fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
+ gb_sets:singleton(MsgId), CTM)
end, CRef, State).
client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTM) ->
- case dict:find(CRef, CTM) of
+ case maps:find(CRef, CTM) of
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
ActionTaken),
MsgIds1 = rabbit_misc:gb_sets_difference(
Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
- true -> dict:erase(CRef, CTM);
- false -> dict:store(CRef, MsgIds1, CTM)
+ true -> maps:remove(CRef, CTM);
+ false -> maps:put(CRef, MsgIds1, CTM)
end;
error -> CTM
end
@@ -1402,9 +1403,9 @@ close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
State #msstate { file_handle_cache = close_handle(Key, FHC) };
close_handle(Key, FHC) ->
- case dict:find(Key, FHC) of
+ case maps:find(Key, FHC) of
{ok, Hdl} -> ok = file_handle_cache:close(Hdl),
- dict:erase(Key, FHC);
+ maps:remove(Key, FHC);
error -> FHC
end.
@@ -1419,7 +1420,7 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
[ begin
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
- true -> case dict:fetch(Ref, ClientRefs) of
+ true -> case maps:get(Ref, ClientRefs) of
{_CPid, _MsgOnDiskFun, undefined} ->
ok;
{_CPid, _MsgOnDiskFun, CloseFDsFun} ->
@@ -1456,16 +1457,16 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_handle_cache = FHC,
client_ref = Ref }) ->
- ok = dict:fold(fun (File, Hdl, ok) ->
+ ok = maps:fold(fun (File, Hdl, ok) ->
true = ets:delete(FileHandlesEts, {Ref, File}),
file_handle_cache:close(Hdl)
end, ok, FHC),
- CState #client_msstate { file_handle_cache = dict:new() };
+ CState #client_msstate { file_handle_cache = #{} };
close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
- ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
+ ok = maps:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
ok, FHC),
- State #msstate { file_handle_cache = dict:new() }.
+ State #msstate { file_handle_cache = #{} }.
get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
dir = Dir }) ->
@@ -1478,11 +1479,11 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
{Hdl, State #msstate { file_handle_cache = FHC2 }}.
get_read_handle(FileNum, FHC, Dir) ->
- case dict:find(FileNum, FHC) of
+ case maps:find(FileNum, FHC) of
{ok, Hdl} -> {Hdl, FHC};
error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
?READ_MODE),
- {Hdl, dict:store(FileNum, Hdl, FHC)}
+ {Hdl, maps:put(FileNum, Hdl, FHC)}
end.
preallocate(Hdl, FileSizeLimit, FinalPos) ->