summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-22 16:23:55 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-22 16:23:55 +0100
commit7d78971a0eef645b34905cd681e48012ce7cf07d (patch)
tree2dfecec671a3c4aa05fde0f1d7ec727a74ad1237
parent966a760267e30aa0d0c3a5b2a841fa9350238689 (diff)
downloadrabbitmq-server-git-7d78971a0eef645b34905cd681e48012ce7cf07d.tar.gz
bug fix in fhc:read (not tracking change in offset). Also made qi use new fhc, and remove hdfhc
-rw-r--r--src/file_handle_cache.erl59
-rw-r--r--src/horrendously_dumb_file_handle_cache.erl246
-rw-r--r--src/rabbit_queue_index.erl78
3 files changed, 91 insertions, 292 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index e86344a087..33a69ed78f 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -134,12 +134,17 @@ read(Ref, NewOffset, Count, State) ->
case write_buffer(Handle) of
{ok, Handle2} ->
case maybe_seek(NewOffset, Handle2) of
- {ok, Handle3 = #handle { hdl = Hdl }} ->
+ {ok, Handle3 = #handle { hdl = Hdl,
+ offset = Offset }} ->
case file:read(Hdl, Count) of
- {ok, _} = Obj -> {Obj, Handle3};
- eof -> {eof,
- Handle3 #handle { at_eof = true }};
- {error, _} = Error -> {Error, Handle3}
+ {ok, _} = Obj ->
+ {Obj, Handle3 #handle {
+ offset = Offset + Count }};
+ eof ->
+ {eof, Handle3 #handle {
+ at_eof = true }};
+ {error, _} = Error ->
+ {Error, Handle3}
end;
{Error, Handle3} -> {Error, Handle3}
end;
@@ -185,6 +190,50 @@ position(Ref, NewOffset, State) ->
{Result, State}
end.
+sync(Ref, State) ->
+ case get({Ref, fhc_handle}) of
+ undefined -> {{error, not_open}, State};
+ Handle = #handle { write_buffer = [], hdl = Hdl, offset = Offset } ->
+ {Result, Handle1} =
+ case file:sync(Hdl) of
+ ok -> {ok, Handle #handle { trusted_offset = Offset }};
+ Error -> {Error, Handle}
+ end,
+ put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
+ {Result, State};
+ Handle = #handle { at_eof = true } ->
+ %% we can't have content in the buffer without being at eof
+ {Result, Handle1} = write_buffer(Handle),
+ put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
+ {Result, State}
+ end.
+
+truncate(Ref, State) ->
+ case get({Ref, fhc_handle}) of
+ undefined -> {{error, not_open}, State};
+ Handle = #handle { is_write = true } ->
+ {Result, Handle1} =
+ case write_buffer(Handle) of
+ {ok,
+ Handle2 = #handle { hdl = Hdl, offset = Offset,
+ trusted_offset = TrustedOffset }} ->
+ case file:truncate(Hdl) of
+ ok ->
+ {ok,
+ Handle2 #handle {
+ at_eof = true,
+ trusted_offset = lists:min([Offset,
+ TrustedOffset])
+ }};
+ Error -> {Error, Handle2}
+ end;
+ {Error, Handle2} -> {Error, Handle2}
+ end,
+ put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now () }),
+ {Result, State};
+ _Handle -> {{error, not_open_for_writing}, State}
+ end.
+
open1(Path, Mode, Options, GRef, State) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
diff --git a/src/horrendously_dumb_file_handle_cache.erl b/src/horrendously_dumb_file_handle_cache.erl
deleted file mode 100644
index e3aa49b3ca..0000000000
--- a/src/horrendously_dumb_file_handle_cache.erl
+++ /dev/null
@@ -1,246 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(horrendously_dumb_file_handle_cache).
-
--export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2,
- position/3, truncate/2, with_file_handle_at/4, sync_to_offset/3]).
-
--record(entry,
- { hdl,
- current_offset,
- last_sync_offset,
- write_buffer,
- is_append,
- at_eof,
- path_mode_key }).
-
-init() -> empty_state.
-
-open(Path, Mode, [] = _ExtraOptions, State) ->
- Mode1 = lists:usort(Mode),
- Path1 = filename:absname(Path),
- Key = {Path1, Mode1},
- case get({fhc, path_mode_ref, Key}) of
- {ref, Ref} -> {{ok, Ref}, State};
- undefined ->
- case file:open(Path1, Mode1) of
- {ok, Hdl} ->
- Ref = make_ref(),
- put({fhc, path_mode_ref, Key}, {ref, Ref}),
- Entry = #entry { hdl = Hdl, current_offset = 0,
- last_sync_offset = 0, write_buffer = [],
- is_append = lists:member(append, Mode1),
- at_eof = false, path_mode_key = Key },
- put({fhc, ref_entry, Ref}, Entry),
- {{ok, Ref}, State};
- {error, Error} ->
- {{error, Error}, State}
- end
- end.
-
-close(Ref, State) ->
- {ok,
- case erase({fhc, ref_entry, Ref}) of
- #entry { hdl = Hdl, write_buffer = WriteBuffer, path_mode_key = Key } ->
- ok = case WriteBuffer of
- [] -> ok;
- _ -> ok = file:write(Hdl, lists:reverse(WriteBuffer)),
- file:sync(Hdl)
- end,
- ok = file:close(Hdl),
- erase({fhc, path_mode_ref, Key}),
- State;
- undefined -> State
- end}.
-
-release(_Ref, State) -> %% noop for the time being
- {ok, State}.
-
-read(Ref, Offset, Count, State) ->
- case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl, current_offset = OldOffset,
- write_buffer = WriteBuffer } ->
- ok = case WriteBuffer of
- [] -> ok;
- _ -> file:write(Hdl, lists:reverse(WriteBuffer))
- end,
- NewOffset = Count +
- case Offset of
- cur -> OldOffset;
- _ -> {ok, RealOff} = file:position(Hdl, Offset),
- RealOff
- end,
- put({fhc, ref_entry, Ref},
- Entry #entry { current_offset = NewOffset,
- at_eof = Offset =:= eof,
- write_buffer = [] }),
- {file:read(Hdl, Count), State};
- undefined -> {{error, not_open}, State}
- end.
-
-%% if the file was opened in append mode, then Offset is ignored, as
-%% it would only affect the read head for this file.
-write(Ref, Offset, Data, State) ->
- case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl, current_offset = OldOffset,
- is_append = IsAppend, at_eof = AtEoF,
- write_buffer = WriteBuffer } ->
- NewOffset =
- case IsAppend of
- true ->
- OldOffset;
- false ->
- size_of_write_data(Data) +
- case Offset of
- cur -> OldOffset;
- eof when AtEoF -> OldOffset;
- _ -> {ok, RealOff} = file:position(Hdl, Offset),
- RealOff
- end
- end,
- WriteBuffer1 = [Data | WriteBuffer],
- put({fhc, ref_entry, Ref},
- Entry #entry { current_offset = NewOffset,
- at_eof = Offset =:= eof,
- write_buffer = WriteBuffer1 }),
- {ok, State};
- undefined -> {{error, not_open}, State}
- end.
-
-sync(Ref, State) ->
- case get({fhc, ref_entry, Ref}) of
- #entry { write_buffer = [] } -> {ok, State};
- Entry = #entry { hdl = Hdl, current_offset = Offset,
- last_sync_offset = LastSyncOffset,
- write_buffer = WriteBuffer } ->
- SyncOffset = lists:max([Offset, LastSyncOffset]),
- ok = file:write(Hdl, lists:reverse(WriteBuffer)),
- ok = file:sync(Hdl),
- put({fhc, ref_entry, Ref},
- Entry #entry { last_sync_offset = SyncOffset,
- write_buffer = [] }),
- {ok, State};
- undefined -> {{error, not_open}, State}
- end.
-
-position(Ref, NewOffset, State) ->
- case get({fhc, ref_entry, Ref}) of
- #entry { current_offset = NewOffset } ->
- {ok, State};
- #entry { at_eof = true } when NewOffset =:= eof ->
- {ok, State};
- Entry = #entry { hdl = Hdl, write_buffer = WriteBuffer } ->
- ok = case WriteBuffer of
- [] -> ok;
- _ -> file:write(Hdl, lists:reverse(WriteBuffer))
- end,
- {ok, RealOff} = file:position(Hdl, NewOffset),
- put({fhc, ref_entry, Ref},
- Entry #entry { current_offset = RealOff,
- write_buffer = [],
- at_eof = NewOffset =:= eof }),
- {ok, State};
- undefined ->
- {{error, not_open}, State}
- end.
-
-truncate(Ref, State) ->
- case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl, write_buffer = WriteBuffer } ->
- ok = case WriteBuffer of
- [] -> ok;
- _ -> file:write(Hdl, lists:reverse(WriteBuffer))
- end,
- ok = file:truncate(Hdl),
- put({fhc, ref_entry, Ref},
- Entry #entry { at_eof = true, write_buffer = [] }),
- {ok, State};
- undefined -> {{error, not_open}, State}
- end.
-
-with_file_handle_at(Ref, Offset, Fun, State) ->
- case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl, current_offset = OldOffset,
- write_buffer = WriteBuffer, at_eof = AtEoF } ->
- ok = case WriteBuffer of
- [] -> ok;
- _ -> file:write(Hdl, lists:reverse(WriteBuffer))
- end,
- ok = case Offset of
- eof when AtEoF -> ok;
- cur -> ok;
- OldOffset -> ok;
- _ -> {ok, _RealOff} = file:position(Hdl, Offset),
- ok
- end,
- {Offset2, Result} = Fun(Hdl),
- put({fhc, ref_entry, Ref},
- Entry #entry { current_offset = Offset2, write_buffer = [],
- at_eof = false }),
- {Result, State};
- undefined -> {{error, not_open}, State}
- end.
-
-sync_to_offset(Ref, Offset, State) ->
- case get({fhc, ref_entry, Ref}) of
- Entry = #entry { hdl = Hdl, last_sync_offset = LastSyncOffset,
- current_offset = CurOffset,
- write_buffer = [_|_] = WriteBuffer }
- when (Offset =:= cur andalso CurOffset > LastSyncOffset)
- orelse (Offset > LastSyncOffset) ->
- ok = case WriteBuffer of
- [] -> ok;
- _ -> file:write(Hdl, lists:reverse(WriteBuffer))
- end,
- ok = file:sync(Hdl),
- LastSyncOffset1 =
- case Offset of
- cur -> lists:max([LastSyncOffset, CurOffset]);
- _ -> lists:max([LastSyncOffset, CurOffset, Offset])
- end,
- put({fhc, ref_entry, Ref},
- Entry #entry { last_sync_offset = LastSyncOffset1,
- write_buffer = [] }),
- {ok, State};
- #entry {} -> {ok, State};
- error -> {{error, not_open}, State}
- end.
-
-size_of_write_data(Data) ->
- size_of_write_data(Data, 0).
-
-size_of_write_data([], Acc) ->
- Acc;
-size_of_write_data([A|B], Acc) ->
- size_of_write_data(B, size_of_write_data(A, Acc));
-size_of_write_data(Bin, Acc) when is_binary(Bin) ->
- size(Bin) + Acc.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 48da7e3f33..a50d839c87 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -156,7 +156,7 @@
%%----------------------------------------------------------------------------
init(Name) ->
- HCState = horrendously_dumb_file_handle_cache:init(),
+ HCState = file_handle_cache:init(),
StrName = queue_name_to_dir_name(Name),
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
@@ -185,19 +185,19 @@ write_published(MsgId, SeqId, IsPersistent, State)
?MSG_ID_BYTES = size(MsgId),
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
{Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, HCState} = horrendously_dumb_file_handle_cache:write(
- Hdl, eof,
- <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, MsgId/binary>>,
- State1 #qistate.hc_state),
+ {ok, HCState} =
+ file_handle_cache:append(Hdl,
+ <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, MsgId/binary>>,
+ State1 #qistate.hc_state),
State1 #qistate { hc_state = HCState }.
write_delivered(SeqId, State) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
{Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, HCState} = horrendously_dumb_file_handle_cache:write(
- Hdl, eof,
+ {ok, HCState} = file_handle_cache:append(
+ Hdl,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS>>,
State1 #qistate.hc_state),
@@ -209,9 +209,8 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict,
{JAckDict1, JAckCount1, HCState} =
lists:foldl(
fun (SeqId, {JAckDict2, JAckCount2, HCStateN}) ->
- {ok, HCStateM} =
- horrendously_dumb_file_handle_cache:write(
- Hdl, eof, <<SeqId:?SEQ_BITS>>, HCStateN),
+ {ok, HCStateM} = file_handle_cache:append(
+ Hdl, <<SeqId:?SEQ_BITS>>, HCStateN),
{add_ack_to_ack_dict(SeqId, JAckDict2),
JAckCount2 + 1, HCStateM}
end, {JAckDict, JAckCount, State1 #qistate.hc_state}, SeqIds),
@@ -233,8 +232,7 @@ sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) ->
HCState1 =
dict:fold(
fun (_Key, Hdl, HCStateN) ->
- {ok, HCStateM} =
- horrendously_dumb_file_handle_cache:sync(Hdl, HCStateN),
+ {ok, HCStateM} = file_handle_cache:sync(Hdl, HCStateN),
HCStateM
end, HCState, SegHdls),
State #qistate { hc_state = HCState1 }.
@@ -253,10 +251,8 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict,
JAckCount1 == 0 ->
{Hdl, State3 = #qistate { hc_state = HCState }} =
get_journal_handle(State2),
- {ok, HCState1} =
- horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState),
- {ok, HCState2} =
- horrendously_dumb_file_handle_cache:truncate(Hdl, HCState1),
+ {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState),
+ {ok, HCState2} = file_handle_cache:truncate(Hdl, HCState1),
{false, State3 #qistate { hc_state = HCState2 }};
JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
flush_journal(State2);
@@ -391,8 +387,8 @@ new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) ->
true -> close_all_handles(State);
false -> State
end,
- {{ok, Hdl}, HCState1} =
- horrendously_dumb_file_handle_cache:open(Path, Mode, [], HCState),
+ {{ok, Hdl}, HCState1} =
+ file_handle_cache:open(Path, Mode, [{write_buffer, infinity}], HCState),
{Hdl, State1 #qistate { hc_state = HCState1,
seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}.
@@ -400,8 +396,7 @@ close_handle(Key, State = #qistate { hc_state = HCState,
seg_num_handles = SegHdls }) ->
case dict:find(Key, SegHdls) of
{ok, Hdl} ->
- {ok, HCState1} =
- horrendously_dumb_file_handle_cache:close(Hdl, HCState),
+ {ok, HCState1} = file_handle_cache:close(Hdl, HCState),
State #qistate { hc_state = HCState1,
seg_num_handles = dict:erase(Key, SegHdls) };
error -> State
@@ -411,9 +406,8 @@ close_all_handles(State = #qistate { hc_state = HCState,
seg_num_handles = SegHdls }) ->
HCState1 =
dict:fold(
- fun (_Key, Ref, HCStateN) ->
- {ok, HCStateM} =
- horrendously_dumb_file_handle_cache:close(Ref, HCStateN),
+ fun (_Key, Hdl, HCStateN) ->
+ {ok, HCStateM} = file_handle_cache:close(Hdl, HCStateN),
HCStateM
end, HCState, SegHdls),
State #qistate { hc_state = HCState1, seg_num_handles = dict:new() }.
@@ -527,8 +521,7 @@ scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
{TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}.
load_journal(Hdl, ADict, HCState) ->
- case horrendously_dumb_file_handle_cache:read(
- Hdl, cur, ?SEQ_BYTES, HCState) of
+ case file_handle_cache:read(Hdl, cur, ?SEQ_BYTES, HCState) of
{{ok, <<SeqId:?SEQ_BITS>>}, HCState1} ->
load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1);
{_ErrOrEoF, HCState1} -> {ADict, HCState1}
@@ -561,11 +554,16 @@ deliver_transient(SegNum, SDict, State) ->
{[RelSeq | AckMeAcc], DeliverMeAcc}
end, {[], []}, SDict),
{Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, HCState} = horrendously_dumb_file_handle_cache:write(
- Hdl, eof,
- [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ],
- State1 #qistate.hc_state),
+ {ok, HCState} =
+ case DeliverMe of
+ [] -> {ok, State1 #qistate.hc_state};
+ _ ->
+ file_handle_cache:append(
+ Hdl,
+ [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ],
+ State1 #qistate.hc_state)
+ end,
{AckMe, State1 #qistate { hc_state = HCState }}.
@@ -585,9 +583,7 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
{Hdl, State1 = #qistate { hc_state = HCState,
journal_ack_dict = JAckDict }} =
get_seg_handle(SegNum, State),
- {ok, HCState1} =
- horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState),
-
+ {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState),
{SDict, AckCount, HighRelSeq, HCState2} =
load_segment_entries(Hdl, dict:new(), 0, 0, HCState1),
RelSeqs = case dict:find(SegNum, JAckDict) of
@@ -603,11 +599,11 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
end.
load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) ->
- case horrendously_dumb_file_handle_cache:read(Hdl, cur, 1, HCState) of
+ case file_handle_cache:read(Hdl, cur, 1, HCState) of
{{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} ->
{{ok, LSB}, HCState2} =
- horrendously_dumb_file_handle_cache:read(
+ file_handle_cache:read(
Hdl, cur, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
{SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
@@ -617,7 +613,7 @@ load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
{{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} =
- horrendously_dumb_file_handle_cache:read(
+ file_handle_cache:read(
Hdl, cur, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
@@ -673,11 +669,11 @@ append_acks_to_segment(SegNum, AckCount, Acks, State)
lists:foldl(
fun (RelSeq, {AckCount2, HCStateN}) ->
{ok, HCStateM} =
- horrendously_dumb_file_handle_cache:write(
- Hdl, eof,
+ file_handle_cache:append(
+ Hdl,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS>>, HCStateN),
{AckCount2 + 1, HCStateM}
end, {AckCount, State1 #qistate.hc_state}, Acks),
- {ok, HCState1} = horrendously_dumb_file_handle_cache:sync(Hdl, HCState),
+ {ok, HCState1} = file_handle_cache:sync(Hdl, HCState),
{AckCount1, State1 #qistate { hc_state = HCState1 }}.