diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-04-12 14:30:07 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-04-12 14:30:07 +0300 |
| commit | f22376ee9da053f1a739103a7a5392781ba6e312 (patch) | |
| tree | 11261a09cc30dbdb4320c0cc22cf8cdc74fbcfc5 /src | |
| parent | 8ce0f08268df289ce9bf8d7d4096d3db4816fcf0 (diff) | |
| parent | c4519ddf4baf8af772b8b1a79945eef02cda17f4 (diff) | |
| download | rabbitmq-server-git-f22376ee9da053f1a739103a7a5392781ba6e312.tar.gz | |
Merge branch 'master' into rabbitmq-server-1143
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 57 |
3 files changed, 77 insertions, 50 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 4e24d1b544..c52949af4c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -485,16 +485,21 @@ stop_and_halt() -> rabbit_log:error("Error trying to stop RabbitMQ: ~p:~p", [Type, Reason]), error({Type, Reason}) after - AppsLeft = [ A || {A, _, _} <- application:which_applications() ], - rabbit_log:info( - lists:flatten(["Halting Erlang VM with the following applications:~n", - [" ~p~n" || _ <- AppsLeft]]), - AppsLeft), - %% Also duplicate this information to stderr, so console where - %% foreground broker was running (or systemd journal) will - %% contain information about graceful termination. - io:format(standard_error, "Gracefully halting Erlang VM~n", []), - init:stop() + %% Enclose all the logging in the try block. + %% init:stop() will be called regardless of any errors. + try + AppsLeft = [ A || {A, _, _} <- application:which_applications() ], + rabbit_log:info( + lists:flatten(["Halting Erlang VM with the following applications:~n", + [" ~p~n" || _ <- AppsLeft]]), + AppsLeft), + %% Also duplicate this information to stderr, so console where + %% foreground broker was running (or systemd journal) will + %% contain information about graceful termination. + io:format(standard_error, "Gracefully halting Erlang VM~n", []) + after + init:stop() + end end, ok. diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index b2548cb61a..629228a1be 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -65,7 +65,12 @@ alarmed, %% is monitoring enabled? false on unsupported %% platforms - enabled + enabled, + %% number of retries to enable monitoring if it fails + %% on start-up + retries, + %% Interval between retries + interval }). %%---------------------------------------------------------------------------- @@ -114,20 +119,17 @@ start_link(Args) -> init([Limit]) -> Dir = dir(), + {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), + {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), State = #state{dir = Dir, min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL, max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL, alarmed = false, - enabled = true}, - case {catch get_disk_free(Dir), - vm_memory_monitor:get_total_memory()} of - {N1, N2} when is_integer(N1), is_integer(N2) -> - {ok, start_timer(set_disk_limits(State, Limit))}; - Err -> - rabbit_log:info("Disabling disk free space monitoring " - "on unsupported platform:~n~p~n", [Err]), - {ok, State#state{enabled = false}} - end. + enabled = true, + limit = Limit, + retries = Retries, + interval = Interval}, + {ok, enable(State)}. handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) -> {reply, Limit, State}; @@ -161,6 +163,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Request, State) -> {noreply, State}. +handle_info(try_enable, #state{retries = Retries} = State) -> + {noreply, enable(State#state{retries = Retries - 1})}; handle_info(update, State) -> {noreply, start_timer(internal_update(State))}; @@ -246,7 +250,7 @@ interpret_limit(Absolute) -> emit_update_info(StateStr, CurrentFree, Limit) -> rabbit_log:info( - "Disk free space ~s. Free bytes:~p Limit:~p~n", + "Free disk space is ~s. Free bytes: ~p. Limit: ~p~n", [StateStr, CurrentFree, Limit]). start_timer(State) -> @@ -261,3 +265,20 @@ interval(#state{limit = Limit, max_interval = MaxInterval}) -> IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). + +enable(#state{retries = 0} = State) -> + State; +enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries} + = State) -> + case {catch get_disk_free(Dir), + vm_memory_monitor:get_total_memory()} of + {N1, N2} when is_integer(N1), is_integer(N2) -> + rabbit_log:info("Enabling free disk space monitoring~n", []), + start_timer(set_disk_limits(State, Limit)); + Err -> + rabbit_log:info("Free disk space monitor encountered an error " + "(e.g. failed to parse output from OS tools): ~p, retries left: ~s~n", + [Err, Retries]), + timer:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 809ec4fda3..891cdf0236 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -155,7 +155,7 @@ -type client_msstate() :: #client_msstate { server :: server(), client_ref :: client_ref(), - file_handle_cache :: dict:dict(), + file_handle_cache :: map(), index_state :: any(), index_module :: atom(), dir :: file:filename(), @@ -495,7 +495,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom ?CREDIT_DISC_BOUND), #client_msstate { server = Server, client_ref = Ref, - file_handle_cache = dict:new(), + file_handle_cache = #{}, index_state = IState, index_module = IModule, dir = Dir, @@ -703,7 +703,7 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> - State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), + State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM), dying_clients = maps:remove(CRef, DyingClients) }. @@ -740,7 +740,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> {CleanShutdown, IndexState, ClientRefs1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Name), - Clients = dict:from_list( + Clients = maps:from_list( [{CRef, {undefined, undefined, undefined}} || CRef <- ClientRefs1]), %% CleanShutdown => msg location index and file_summary both @@ -776,7 +776,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> index_state = IndexState, current_file = 0, current_file_handle = undefined, - file_handle_cache = dict:new(), + file_handle_cache = #{}, sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, @@ -790,7 +790,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - cref_to_msg_ids = dict:new(), + cref_to_msg_ids = #{}, credit_disc_bound = CreditDiscBound }, %% If we didn't recover the msg location index then we need to @@ -843,7 +843,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> - Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), + Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), erlang:monitor(process, CPid), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, @@ -874,7 +874,7 @@ handle_cast({client_dying, CRef}, handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> - State1 = State #msstate { clients = dict:erase(CRef, Clients) }, + State1 = State #msstate { clients = maps:remove(CRef, Clients) }, noreply(clear_client(CRef, State1)); handle_cast({write, CRef, MsgId, Flow}, @@ -882,7 +882,7 @@ handle_cast({write, CRef, MsgId, Flow}, clients = Clients, credit_disc_bound = CreditDiscBound }) -> case Flow of - flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + flow -> {CPid, _, _} = maps:get(CRef, Clients), %% We are going to process a message sent by the %% rabbit_amqqueue_process. Now we are accessing the %% msg_store process dictionary. @@ -1003,7 +1003,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts, FlyingEts]], IndexModule:terminate(IndexState), - case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, + case store_recovery_terms([{client_refs, maps:keys(Clients)}, {index_module, IndexModule}], Dir) of ok -> rabbit_log:info("Message store for directory '~s' is stopped", [Dir]), @@ -1035,12 +1035,12 @@ reply(Reply, State) -> next_state(State = #msstate { sync_timer_ref = undefined, cref_to_msg_ids = CTM }) -> - case dict:size(CTM) of + case maps:size(CTM) of 0 -> {State, hibernate}; _ -> {start_sync_timer(State), 0} end; next_state(State = #msstate { cref_to_msg_ids = CTM }) -> - case dict:size(CTM) of + case maps:size(CTM) of 0 -> {stop_sync_timer(State), hibernate}; _ -> {State, 0} end. @@ -1055,7 +1055,7 @@ stop_sync_timer(State) -> internal_sync(State = #msstate { current_file_handle = CurHdl, cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), - CGs = dict:fold(fun (CRef, MsgIds, NS) -> + CGs = maps:fold(fun (CRef, MsgIds, NS) -> case gb_sets:is_empty(MsgIds) of true -> NS; false -> [{CRef, MsgIds} | NS] @@ -1327,7 +1327,7 @@ orddict_store(Key, Val, Dict) -> update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, cref_to_msg_ids = CTM }) -> - case dict:fetch(CRef, Clients) of + case maps:get(CRef, Clients) of {_CPid, undefined, _CloseFDsFun} -> State; {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), State #msstate { @@ -1337,21 +1337,22 @@ update_pending_confirms(Fun, CRef, record_pending_confirm(CRef, MsgId, State) -> update_pending_confirms( fun (_MsgOnDiskFun, CTM) -> - dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end, - gb_sets:singleton(MsgId), CTM) + maps:update_with(CRef, + fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end, + gb_sets:singleton(MsgId), CTM) end, CRef, State). client_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTM) -> - case dict:find(CRef, CTM) of + case maps:find(CRef, CTM) of {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), ActionTaken), MsgIds1 = rabbit_misc:gb_sets_difference( Gs, MsgIds), case gb_sets:is_empty(MsgIds1) of - true -> dict:erase(CRef, CTM); - false -> dict:store(CRef, MsgIds1, CTM) + true -> maps:remove(CRef, CTM); + false -> maps:put(CRef, MsgIds1, CTM) end; error -> CTM end @@ -1402,9 +1403,9 @@ close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> State #msstate { file_handle_cache = close_handle(Key, FHC) }; close_handle(Key, FHC) -> - case dict:find(Key, FHC) of + case maps:find(Key, FHC) of {ok, Hdl} -> ok = file_handle_cache:close(Hdl), - dict:erase(Key, FHC); + maps:remove(Key, FHC); error -> FHC end. @@ -1419,7 +1420,7 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> [ begin case (ets:update_element(FileHandlesEts, Key, {2, close}) andalso Invoke) of - true -> case dict:fetch(Ref, ClientRefs) of + true -> case maps:get(Ref, ClientRefs) of {_CPid, _MsgOnDiskFun, undefined} -> ok; {_CPid, _MsgOnDiskFun, CloseFDsFun} -> @@ -1456,16 +1457,16 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, file_handle_cache = FHC, client_ref = Ref }) -> - ok = dict:fold(fun (File, Hdl, ok) -> + ok = maps:fold(fun (File, Hdl, ok) -> true = ets:delete(FileHandlesEts, {Ref, File}), file_handle_cache:close(Hdl) end, ok, FHC), - CState #client_msstate { file_handle_cache = dict:new() }; + CState #client_msstate { file_handle_cache = #{} }; close_all_handles(State = #msstate { file_handle_cache = FHC }) -> - ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end, + ok = maps:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end, ok, FHC), - State #msstate { file_handle_cache = dict:new() }. + State #msstate { file_handle_cache = #{} }. get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC, dir = Dir }) -> @@ -1478,11 +1479,11 @@ get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC, {Hdl, State #msstate { file_handle_cache = FHC2 }}. get_read_handle(FileNum, FHC, Dir) -> - case dict:find(FileNum, FHC) of + case maps:find(FileNum, FHC) of {ok, Hdl} -> {Hdl, FHC}; error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum), ?READ_MODE), - {Hdl, dict:store(FileNum, Hdl, FHC)} + {Hdl, maps:put(FileNum, Hdl, FHC)} end. preallocate(Hdl, FileSizeLimit, FinalPos) -> |
