summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-03-13 16:39:09 +0000
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-03-23 17:50:54 +0100
commit73e12734e799696ce72b735baa7a6daf8bb38118 (patch)
tree961bc4ed1ec055f890c456f60393014ac0bfca3e /src
parent8e2ae9df61da376607624a871ee09fb134746560 (diff)
downloadrabbitmq-server-git-73e12734e799696ce72b735baa7a6daf8bb38118.tar.gz
Tune the read buffer size to prevent pathological behaviour
...such as when we are reading backwards through a file. References #69.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl71
1 files changed, 51 insertions, 20 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 5a916c75bc..43e4576494 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -180,8 +180,10 @@
write_buffer,
read_buffer,
read_buffer_pos,
- read_buffer_rem, %% Num of bytes from pos to end
- read_buffer_size_limit,
+ read_buffer_rem, %% Num of bytes from pos to end
+ read_buffer_size, %% Next size of read buffer to use
+ read_buffer_size_limit, %% Max size of read buffer to use
+ read_buffer_usage, %% Bytes we have read from it, for tuning
at_eof,
path,
mode,
@@ -339,22 +341,27 @@ read(Ref, Count) ->
[Ref], keep,
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
- ([Handle = #handle{read_buffer = Buf,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- offset = Offset}]) when BufRem >= Count ->
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ read_buffer_usage = BufUsg,
+ offset = Offset}])
+ when BufRem >= Count ->
<<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf,
- {{ok, Res}, [Handle#handle{offset = Offset + Count,
- read_buffer_pos = BufPos + Count,
- read_buffer_rem = BufRem - Count}]};
- ([Handle = #handle{read_buffer = Buf,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- read_buffer_size_limit = BufSzLimit,
- hdl = Hdl,
- offset = Offset}]) ->
+ {{ok, Res}, [Handle#handle{offset = Offset + Count,
+ read_buffer_pos = BufPos + Count,
+ read_buffer_rem = BufRem - Count,
+ read_buffer_usage = BufUsg + Count }]};
+ ([Handle0]) ->
+ Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ read_buffer_size = BufSz,
+ hdl = Hdl,
+ offset = Offset}
+ = tune_read_buffer_limit(Handle0),
WantedCount = Count - BufRem,
- case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of
+ case prim_file_read(Hdl, lists:max([BufSz, WantedCount])) of
{ok, Data} ->
<<_:BufPos/binary, BufTl/binary>> = Buf,
ReadCount = size(Data),
@@ -369,10 +376,11 @@ read(Ref, Count) ->
OffSet1 = Offset + BufRem + WantedCount,
BufRem1 = ReadCount - WantedCount,
{{ok, <<BufTl/binary, Hd/binary>>},
- [Handle#handle{offset = OffSet1,
- read_buffer = Data,
- read_buffer_pos = WantedCount,
- read_buffer_rem = BufRem1}]}
+ [Handle#handle{offset = OffSet1,
+ read_buffer = Data,
+ read_buffer_pos = WantedCount,
+ read_buffer_rem = BufRem1,
+ read_buffer_usage = WantedCount}]}
end;
eof ->
{eof, [Handle #handle { at_eof = true }]};
@@ -789,7 +797,9 @@ new_closed_handle(Path, Mode, Options) ->
read_buffer = <<>>,
read_buffer_pos = 0,
read_buffer_rem = 0,
+ read_buffer_size = ReadBufferSize,
read_buffer_size_limit = ReadBufferSize,
+ read_buffer_usage = 0,
at_eof = false,
path = Path,
mode = Mode,
@@ -919,6 +929,27 @@ reset_read_buffer(Handle) ->
read_buffer_pos = 0,
read_buffer_rem = 0}.
+%% We come into this function whenever there's been a miss while
+%% reading from the buffer - but note that when we seek we reset the
+%% buffer, so the first read after a seek will always be a
+%% miss. Therefore in that case don't take usage = 0 as meaning the
+%% buffer was usless, we just haven't filled it yet!
+tune_read_buffer_limit(Handle = #handle{read_buffer_usage = 0}) ->
+ Handle;
+%% In this head we hve been using the buffer but now tried to read
+%% outside it. So how did we do? If we used less than the size of the
+%% buffer, make the new buffer that size. If we read 100% of what we
+%% had, then double it for next time, up to the limit that was set
+%% when we were created.
+tune_read_buffer_limit(Handle = #handle{read_buffer_usage = Usg,
+ read_buffer_size = Sz,
+ read_buffer_size_limit = Lim}) ->
+ Handle#handle{read_buffer_usage = 0,
+ read_buffer_size = erlang:min(case Usg < Sz of
+ true -> Usg;
+ false -> Usg * 2
+ end, Lim)}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;