diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-06-24 19:02:14 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-06-24 19:02:14 +0200 |
| commit | 46d07781a53a983f093af6ddd11b816dfea5dc1a (patch) | |
| tree | 631e886d24d6ef8a6f75c07535ebdd1e8b80b9e0 | |
| parent | a4c2eeed06c9cda4e6900a9d299e8c14e782950c (diff) | |
| parent | 4b8bcce21475811d6525580838242e0529e0276f (diff) | |
| download | rabbitmq-server-git-46d07781a53a983f093af6ddd11b816dfea5dc1a.tar.gz | |
Merge branch 'stable'
| -rw-r--r-- | src/file_handle_cache.erl | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index b323842ba0..d8203af877 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -148,7 +148,7 @@ copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2, - info/0, info/1]). + info/0, info/1, clear_read_cache/0]). -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, @@ -164,6 +164,8 @@ -define(CLIENT_ETS_TABLE, file_handle_cache_client). -define(ELDERS_ETS_TABLE, file_handle_cache_elders). +-include("rabbit.hrl"). % For #amqqueue record definition. + %%---------------------------------------------------------------------------- -record(file, @@ -581,6 +583,42 @@ info_keys() -> ?INFO_KEYS. info() -> info(?INFO_KEYS). info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). +clear_read_cache() -> + gen_server2:cast(?SERVER, clear_read_cache), + clear_vhost_read_cache(rabbit_vhost:list()). + +clear_vhost_read_cache([]) -> + ok; +clear_vhost_read_cache([VHost | Rest]) -> + clear_queue_read_cache(rabbit_amqqueue:list(VHost)), + clear_vhost_read_cache(Rest). + +clear_queue_read_cache([]) -> + ok; +clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) -> + %% Limit the action to the current node. + Pids = [P || P <- [MPid | SPids], node(P) =:= node()], + %% This function is executed in the context of the backing queue + %% process because the read buffer is stored in the process + %% dictionary. + Fun = fun(_, State) -> + clear_process_read_cache(), + State + end, + [rabbit_amqqueue:run_backing_queue(Pid, rabbit_variable_queue, Fun) + || Pid <- Pids], + clear_queue_read_cache(Rest). + +clear_process_read_cache() -> + [ + begin + Handle1 = reset_read_buffer(Handle), + put({Ref, fhc_handle}, Handle1) + end || + {{Ref, fhc_handle}, Handle} <- get(), + size(Handle#handle.read_buffer) > 0 + ]. + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -1147,7 +1185,11 @@ handle_cast({transfer, N, FromPid, ToPid}, State) -> {noreply, process_pending( update_counts({obtain, socket}, ToPid, +N, update_counts({obtain, socket}, FromPid, -N, - State)))}. + State)))}; + +handle_cast(clear_read_cache, State) -> + clear_process_read_cache(), + {noreply, State}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; |
