diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-06-23 19:19:41 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-06-24 16:16:00 +0200 |
| commit | 8a485193c7a00fd6685a2ebe9175fba11fd8aa34 (patch) | |
| tree | b521445e74110419b860cd32ba83746ae3557d3d | |
| parent | 677bb64ffd177f72d6131a8a6202b7c8a4ff6f24 (diff) | |
| download | rabbitmq-server-git-8a485193c7a00fd6685a2ebe9175fba11fd8aa34.tar.gz | |
file_handle_cache: Add a command to clear all read buffers
This is meant to be called from `rabbitmqctl`:
rabbitmqctl eval 'file_handle_cache:clear_read_cache().'
The command sends a message to all queues on all vhosts to make them
clear their own read cache. It is asynchronous, so the command will
return before the action is complete.
Fixes #196.
| -rw-r--r-- | src/file_handle_cache.erl | 42 |
1 files changed, 40 insertions, 2 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index b323842ba0..dfbfa41fa0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -148,7 +148,7 @@ copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2, - info/0, info/1]). + info/0, info/1, clear_read_cache/0]). -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, @@ -164,6 +164,8 @@ -define(CLIENT_ETS_TABLE, file_handle_cache_client). -define(ELDERS_ETS_TABLE, file_handle_cache_elders). +-include("rabbit.hrl"). % For #amqqueue record definition. + %%---------------------------------------------------------------------------- -record(file, @@ -581,6 +583,38 @@ info_keys() -> ?INFO_KEYS. info() -> info(?INFO_KEYS). info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). +clear_read_cache() -> + gen_server2:cast(?SERVER, clear_read_cache), + clear_vhost_read_cache(rabbit_vhost:list()). + +clear_vhost_read_cache([]) -> + ok; +clear_vhost_read_cache([VHost | Rest]) -> + clear_queue_read_cache(rabbit_amqqueue:list(VHost)), + clear_vhost_read_cache(Rest). + +clear_queue_read_cache([]) -> + ok; +clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) -> + Fun = fun(_, State) -> + clear_process_read_cache(), + State + end, + rabbit_amqqueue:run_backing_queue(MPid, rabbit_variable_queue, Fun), + [rabbit_amqqueue:run_backing_queue(SPid, rabbit_variable_queue, Fun) + || SPid <- SPids], + clear_queue_read_cache(Rest). + +clear_process_read_cache() -> + [ + begin + Handle1 = reset_read_buffer(Handle), + put({Ref, fhc_handle}, Handle1) + end || + {{Ref, fhc_handle}, Handle} <- get(), + size(Handle#handle.read_buffer) > 0 + ]. + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -1147,7 +1181,11 @@ handle_cast({transfer, N, FromPid, ToPid}, State) -> {noreply, process_pending( update_counts({obtain, socket}, ToPid, +N, update_counts({obtain, socket}, FromPid, -N, - State)))}. + State)))}; + +handle_cast(clear_read_cache, State) -> + clear_process_read_cache(), + {noreply, State}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; |
