summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-22 17:54:07 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-22 17:54:07 +0100
commitabf7da7eb8bc832f594bf030d7ea6c709ad650ff (patch)
treefca187f68fb072cb8fd50162b17f5be93749d791 /src
parent91edec707b01d130854686ad2de589ec01175f2e (diff)
downloadrabbitmq-server-git-abf7da7eb8bc832f594bf030d7ea6c709ad650ff.tar.gz
drop the offset arg to read, also account for the possibility that we didn't read as much of the file as we asked for. All tests pass.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl39
-rw-r--r--src/rabbit_queue_index.erl8
2 files changed, 15 insertions, 32 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6959dcc27f..fe86044bc1 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -31,7 +31,7 @@
-module(file_handle_cache).
--export([init/0, open/4, close/2, release/2, read/4, append/3, sync/2,
+-export([init/0, open/4, close/2, release/2, read/3, append/3, sync/2,
position/3, truncate/2, last_sync_offset/2]).
-record(file,
@@ -134,28 +134,21 @@ close(Ref, State) ->
release(_Ref, State) -> %% noop just for now
{ok, State}.
-read(Ref, NewOffset, Count, State) ->
+read(Ref, Count, State) ->
case get_or_reopen(Ref, State) of
{{ok, #handle { is_read = false }}, State1} ->
{{error, not_open_for_reading}, State1};
{{ok, Handle}, State1} ->
{Result, Handle1} =
case write_buffer(Handle) of
- {ok, Handle2} ->
- case maybe_seek(NewOffset, Handle2) of
- {ok, Handle3 = #handle { hdl = Hdl,
- offset = Offset }} ->
- case file:read(Hdl, Count) of
- {ok, _} = Obj ->
- {Obj, Handle3 #handle {
- offset = Offset + Count }};
- eof ->
- {eof, Handle3 #handle {
- at_eof = true }};
- {error, _} = Error ->
- {Error, Handle3}
- end;
- {Error, Handle3} -> {Error, Handle3}
+ {ok, Handle2 = #handle { hdl = Hdl, offset = Offset }} ->
+ case file:read(Hdl, Count) of
+ {ok, Data} = Obj ->
+ Size = iolist_size(Data),
+ {Obj,
+ Handle2 #handle { offset = Offset + Size }};
+ eof -> {eof, Handle2 #handle { at_eof = true }};
+ Error -> {Error, Handle2}
end;
{Error, Handle2} -> {Error, Handle2}
end,
@@ -299,7 +292,7 @@ write_to_buffer(Data, Handle =
#handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
write_buffer_size_limit = Limit }) ->
- Size1 = Size + size_of_write_data(Data),
+ Size1 = Size + iolist_size(Data),
Handle1 = Handle #handle { write_buffer = [ Data | WriteBuffer ],
write_buffer_size = Size1 },
case Limit /= infinity andalso Size1 > Limit of
@@ -322,16 +315,6 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
-size_of_write_data(Data) ->
- size_of_write_data(Data, 0).
-
-size_of_write_data([], Acc) ->
- Acc;
-size_of_write_data([A|B], Acc) ->
- size_of_write_data(B, size_of_write_data(A, Acc));
-size_of_write_data(Bin, Acc) when is_binary(Bin) ->
- size(Bin) + Acc.
-
is_reader(Mode) -> lists:member(read, Mode).
is_writer(Mode) -> lists:member(write, Mode).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a50d839c87..50f013f89b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -521,7 +521,7 @@ scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
{TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}.
load_journal(Hdl, ADict, HCState) ->
- case file_handle_cache:read(Hdl, cur, ?SEQ_BYTES, HCState) of
+ case file_handle_cache:read(Hdl, ?SEQ_BYTES, HCState) of
{{ok, <<SeqId:?SEQ_BITS>>}, HCState1} ->
load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1);
{_ErrOrEoF, HCState1} -> {ADict, HCState1}
@@ -599,12 +599,12 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
end.
load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) ->
- case file_handle_cache:read(Hdl, cur, 1, HCState) of
+ case file_handle_cache:read(Hdl, 1, HCState) of
{{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} ->
{{ok, LSB}, HCState2} =
file_handle_cache:read(
- Hdl, cur, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1),
+ Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
{SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq, HCState2);
@@ -614,7 +614,7 @@ load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) ->
%% bytes, the size spec is in bytes, not bits.
{{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} =
file_handle_cache:read(
- Hdl, cur, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1),
+ Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
load_segment_entries(