summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-21 17:05:38 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-21 17:05:38 +0100
commit062199e0329321153838b080a3c5b6ebe087bfe2 (patch)
tree7a2ffa08e3b074970c25a1d909f0ba7f55bfd21c
parenta10db26a6dfc303b5551dafbf4575a77926eca12 (diff)
downloadrabbitmq-server-git-062199e0329321153838b080a3c5b6ebe087bfe2.tar.gz
Added write buffers. Performance now very very good, and could get better if can get msg_store to use the new fhc. All tests pass. Sometimes.
-rw-r--r--src/horrendously_dumb_file_handle_cache.erl97
1 files changed, 59 insertions, 38 deletions
diff --git a/src/horrendously_dumb_file_handle_cache.erl b/src/horrendously_dumb_file_handle_cache.erl
index c034569ddf..e3aa49b3ca 100644
--- a/src/horrendously_dumb_file_handle_cache.erl
+++ b/src/horrendously_dumb_file_handle_cache.erl
@@ -38,7 +38,7 @@
{ hdl,
current_offset,
last_sync_offset,
- is_dirty,
+ write_buffer,
is_append,
at_eof,
path_mode_key }).
@@ -57,7 +57,7 @@ open(Path, Mode, [] = _ExtraOptions, State) ->
Ref = make_ref(),
put({fhc, path_mode_ref, Key}, {ref, Ref}),
Entry = #entry { hdl = Hdl, current_offset = 0,
- last_sync_offset = 0, is_dirty = false,
+ last_sync_offset = 0, write_buffer = [],
is_append = lists:member(append, Mode1),
at_eof = false, path_mode_key = Key },
put({fhc, ref_entry, Ref}, Entry),
@@ -70,10 +70,11 @@ open(Path, Mode, [] = _ExtraOptions, State) ->
close(Ref, State) ->
{ok,
case erase({fhc, ref_entry, Ref}) of
- #entry { hdl = Hdl, is_dirty = IsDirty, path_mode_key = Key } ->
- ok = case IsDirty of
- true -> file:sync(Hdl);
- false -> ok
+ #entry { hdl = Hdl, write_buffer = WriteBuffer, path_mode_key = Key } ->
+ ok = case WriteBuffer of
+ [] -> ok;
+ _ -> ok = file:write(Hdl, lists:reverse(WriteBuffer)),
+ file:sync(Hdl)
end,
ok = file:close(Hdl),
erase({fhc, path_mode_ref, Key}),
@@ -86,7 +87,12 @@ release(_Ref, State) -> %% noop for the time being
read(Ref, Offset, Count, State) ->
case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl, current_offset = OldOffset } ->
+ Entry = #entry { hdl = Hdl, current_offset = OldOffset,
+ write_buffer = WriteBuffer } ->
+ ok = case WriteBuffer of
+ [] -> ok;
+ _ -> file:write(Hdl, lists:reverse(WriteBuffer))
+ end,
NewOffset = Count +
case Offset of
cur -> OldOffset;
@@ -95,7 +101,8 @@ read(Ref, Offset, Count, State) ->
end,
put({fhc, ref_entry, Ref},
Entry #entry { current_offset = NewOffset,
- at_eof = Offset =:= eof }),
+ at_eof = Offset =:= eof,
+ write_buffer = [] }),
{file:read(Hdl, Count), State};
undefined -> {{error, not_open}, State}
end.
@@ -105,7 +112,8 @@ read(Ref, Offset, Count, State) ->
write(Ref, Offset, Data, State) ->
case get({fhc, ref_entry, Ref}) of
Entry = #entry { hdl = Hdl, current_offset = OldOffset,
- is_append = IsAppend, at_eof = AtEoF } ->
+ is_append = IsAppend, at_eof = AtEoF,
+ write_buffer = WriteBuffer } ->
NewOffset =
case IsAppend of
true ->
@@ -119,25 +127,28 @@ write(Ref, Offset, Data, State) ->
RealOff
end
end,
+ WriteBuffer1 = [Data | WriteBuffer],
put({fhc, ref_entry, Ref},
Entry #entry { current_offset = NewOffset,
- is_dirty = true, at_eof = Offset =:= eof }),
- {file:write(Hdl, Data), State};
+ at_eof = Offset =:= eof,
+ write_buffer = WriteBuffer1 }),
+ {ok, State};
undefined -> {{error, not_open}, State}
end.
sync(Ref, State) ->
case get({fhc, ref_entry, Ref}) of
+ #entry { write_buffer = [] } -> {ok, State};
Entry = #entry { hdl = Hdl, current_offset = Offset,
last_sync_offset = LastSyncOffset,
- is_dirty = true } ->
+ write_buffer = WriteBuffer } ->
SyncOffset = lists:max([Offset, LastSyncOffset]),
+ ok = file:write(Hdl, lists:reverse(WriteBuffer)),
ok = file:sync(Hdl),
put({fhc, ref_entry, Ref},
Entry #entry { last_sync_offset = SyncOffset,
- is_dirty = false }),
+ write_buffer = [] }),
{ok, State};
- #entry { is_dirty = false } -> {ok, State};
undefined -> {{error, not_open}, State}
end.
@@ -147,10 +158,15 @@ position(Ref, NewOffset, State) ->
{ok, State};
#entry { at_eof = true } when NewOffset =:= eof ->
{ok, State};
- Entry = #entry { hdl = Hdl } ->
+ Entry = #entry { hdl = Hdl, write_buffer = WriteBuffer } ->
+ ok = case WriteBuffer of
+ [] -> ok;
+ _ -> file:write(Hdl, lists:reverse(WriteBuffer))
+ end,
{ok, RealOff} = file:position(Hdl, NewOffset),
put({fhc, ref_entry, Ref},
Entry #entry { current_offset = RealOff,
+ write_buffer = [],
at_eof = NewOffset =:= eof }),
{ok, State};
undefined ->
@@ -159,9 +175,14 @@ position(Ref, NewOffset, State) ->
truncate(Ref, State) ->
case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl } ->
+ Entry = #entry { hdl = Hdl, write_buffer = WriteBuffer } ->
+ ok = case WriteBuffer of
+ [] -> ok;
+ _ -> file:write(Hdl, lists:reverse(WriteBuffer))
+ end,
ok = file:truncate(Hdl),
- put({fhc, ref_entry, Ref}, Entry #entry { at_eof = true }),
+ put({fhc, ref_entry, Ref},
+ Entry #entry { at_eof = true, write_buffer = [] }),
{ok, State};
undefined -> {{error, not_open}, State}
end.
@@ -169,27 +190,22 @@ truncate(Ref, State) ->
with_file_handle_at(Ref, Offset, Fun, State) ->
case get({fhc, ref_entry, Ref}) of
Entry = #entry { hdl = Hdl, current_offset = OldOffset,
- last_sync_offset = LastSyncOffset,
- is_dirty = IsDirty, at_eof = AtEoF } ->
- Offset1 =
- case Offset of
- eof when AtEoF -> OldOffset;
- cur -> OldOffset;
- OldOffset -> OldOffset;
- _ -> {ok, RealOff} = file:position(Hdl, Offset),
- RealOff
- end,
- LastSyncOffset1 =
- case IsDirty of
- true -> ok = file:sync(Hdl),
- lists:max([Offset1, OldOffset]);
- false -> LastSyncOffset
- end,
+ write_buffer = WriteBuffer, at_eof = AtEoF } ->
+ ok = case WriteBuffer of
+ [] -> ok;
+ _ -> file:write(Hdl, lists:reverse(WriteBuffer))
+ end,
+ ok = case Offset of
+ eof when AtEoF -> ok;
+ cur -> ok;
+ OldOffset -> ok;
+ _ -> {ok, _RealOff} = file:position(Hdl, Offset),
+ ok
+ end,
{Offset2, Result} = Fun(Hdl),
put({fhc, ref_entry, Ref},
- Entry #entry { current_offset = Offset2,
- last_sync_offset = LastSyncOffset1,
- is_dirty = true, at_eof = false }),
+ Entry #entry { current_offset = Offset2, write_buffer = [],
+ at_eof = false }),
{Result, State};
undefined -> {{error, not_open}, State}
end.
@@ -197,9 +213,14 @@ with_file_handle_at(Ref, Offset, Fun, State) ->
sync_to_offset(Ref, Offset, State) ->
case get({fhc, ref_entry, Ref}) of
Entry = #entry { hdl = Hdl, last_sync_offset = LastSyncOffset,
- current_offset = CurOffset, is_dirty = true }
+ current_offset = CurOffset,
+ write_buffer = [_|_] = WriteBuffer }
when (Offset =:= cur andalso CurOffset > LastSyncOffset)
orelse (Offset > LastSyncOffset) ->
+ ok = case WriteBuffer of
+ [] -> ok;
+ _ -> file:write(Hdl, lists:reverse(WriteBuffer))
+ end,
ok = file:sync(Hdl),
LastSyncOffset1 =
case Offset of
@@ -208,7 +229,7 @@ sync_to_offset(Ref, Offset, State) ->
end,
put({fhc, ref_entry, Ref},
Entry #entry { last_sync_offset = LastSyncOffset1,
- is_dirty = false }),
+ write_buffer = [] }),
{ok, State};
#entry {} -> {ok, State};
error -> {{error, not_open}, State}