summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl16
-rw-r--r--src/rabbit_tests.erl47
2 files changed, 40 insertions, 23 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 4f0365718f..61b08d4933 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -970,12 +970,13 @@ queue_fold(Fun, Init, Q) ->
filter_pending(Fun, {Count, Queue}) ->
{Delta, Queue1} =
- queue_fold(fun (Item, {DeltaN, QueueN}) ->
- case Fun(Item) of
- true -> {DeltaN, queue:in(Item, QueueN)};
- false -> {DeltaN - requested(Item), QueueN}
- end
- end, {0, queue:new()}, Queue),
+ queue_fold(
+ fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - Requested, QueueN}
+ end
+ end, {0, queue:new()}, Queue),
{Count + Delta, Queue1}.
pending_new() ->
@@ -1021,9 +1022,6 @@ adjust_alarm(OldState, NewState) ->
end,
NewState.
-requested({_Kind, _Pid, Requested, _From}) ->
- Requested.
-
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b8c3f4a9b8..ca046c9198 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1630,23 +1630,42 @@ test_file_handle_cache() ->
ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores
TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")),
+ [Src1, Dst1, Src2, Dst2] = Files =
+ [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]],
+ Content = <<"foo">>,
+ CopyFun = fun (Src, Dst) ->
+ ok = file:write_file(Src, Content),
+ {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
+ {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
+ Size = size(Content),
+ {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
+ ok = file_handle_cache:delete(SrcHdl),
+ ok = file_handle_cache:delete(DstHdl)
+ end,
Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open(
- filename:join(TmpDir, "file3"),
+ filename:join(TmpDir, "file5"),
[write], []),
- receive close -> ok end,
- file_handle_cache:delete(Hdl)
+ receive {next, Pid1} -> Pid1 ! {next, self()} end,
+ file_handle_cache:delete(Hdl),
+ %% This will block and never return, so we
+ %% exercise the fhc tidying up the pending
+ %% queue on the death of a process.
+ ok = CopyFun(Src1, Dst1)
end),
- Src = filename:join(TmpDir, "file1"),
- Dst = filename:join(TmpDir, "file2"),
- Content = <<"foo">>,
- ok = file:write_file(Src, Content),
- {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
- {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
- Size = size(Content),
- {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
- ok = file_handle_cache:delete(SrcHdl),
- file_handle_cache:delete(DstHdl),
- Pid ! close,
+ ok = CopyFun(Src1, Dst1),
+ ok = file_handle_cache:set_limit(2),
+ Pid ! {next, self()},
+ receive {next, Pid} -> ok end,
+ timer:sleep(100),
+ Pid1 = spawn(fun () -> CopyFun(Src2, Dst2) end),
+ timer:sleep(100),
+ erlang:monitor(process, Pid),
+ erlang:monitor(process, Pid1),
+ exit(Pid, kill),
+ exit(Pid1, kill),
+ receive {'DOWN', _MRef, process, Pid, _Reason} -> ok end,
+ receive {'DOWN', _MRef1, process, Pid1, _Reason1} -> ok end,
+ [file:delete(File) || File <- Files],
ok = file_handle_cache:set_limit(Limit),
passed.