summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl42
1 files changed, 40 insertions, 2 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index b323842ba0..dfbfa41fa0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -148,7 +148,7 @@
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2,
- info/0, info/1]).
+ info/0, info/1, clear_read_cache/0]).
-export([ulimit/0]).
-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
@@ -164,6 +164,8 @@
-define(CLIENT_ETS_TABLE, file_handle_cache_client).
-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
+-include("rabbit.hrl"). % For #amqqueue record definition.
+
%%----------------------------------------------------------------------------
-record(file,
@@ -581,6 +583,38 @@ info_keys() -> ?INFO_KEYS.
info() -> info(?INFO_KEYS).
info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
+clear_read_cache() ->
+ gen_server2:cast(?SERVER, clear_read_cache),
+ clear_vhost_read_cache(rabbit_vhost:list()).
+
+clear_vhost_read_cache([]) ->
+ ok;
+clear_vhost_read_cache([VHost | Rest]) ->
+ clear_queue_read_cache(rabbit_amqqueue:list(VHost)),
+ clear_vhost_read_cache(Rest).
+
+clear_queue_read_cache([]) ->
+ ok;
+clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) ->
+ Fun = fun(_, State) ->
+ clear_process_read_cache(),
+ State
+ end,
+ rabbit_amqqueue:run_backing_queue(MPid, rabbit_variable_queue, Fun),
+ [rabbit_amqqueue:run_backing_queue(SPid, rabbit_variable_queue, Fun)
+ || SPid <- SPids],
+ clear_queue_read_cache(Rest).
+
+clear_process_read_cache() ->
+ [
+ begin
+ Handle1 = reset_read_buffer(Handle),
+ put({Ref, fhc_handle}, Handle1)
+ end ||
+ {{Ref, fhc_handle}, Handle} <- get(),
+ size(Handle#handle.read_buffer) > 0
+ ].
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -1147,7 +1181,11 @@ handle_cast({transfer, N, FromPid, ToPid}, State) ->
{noreply, process_pending(
update_counts({obtain, socket}, ToPid, +N,
update_counts({obtain, socket}, FromPid, -N,
- State)))}.
+ State)))};
+
+handle_cast(clear_read_cache, State) ->
+ clear_process_read_cache(),
+ {noreply, State}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};