diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 4b8759f8f9..d19469d682 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -754,7 +754,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). -get_read_handle(File, Offset, State = +get_read_handle(File, Offset, TotalSize, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit, current_file_name = CurName, @@ -766,7 +766,8 @@ get_read_handle(File, Offset, State = true -> State end, Now = now(), - {FileHdl, ReadHdls1, ReadHdlsAge1} = + NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} = case dict:find(File, ReadHdls) of error -> {ok, Hdl} = file:open(form_filename(File), @@ -774,21 +775,21 @@ get_read_handle(File, Offset, State = read_ahead]), case dict:size(ReadHdls) < ReadFileHandlesLimit of true -> - {Hdl, ReadHdls, ReadHdlsAge}; - _False -> + {Hdl, 0, ReadHdls, ReadHdlsAge}; + false -> {Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, Then}} = + {ok, {OldHdl, _Offset, Then}} = dict:find(OldFile, ReadHdls), ok = file:close(OldHdl), - {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} end; - {ok, {Hdl, Then}} -> - {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + {ok, {Hdl, OldOffset1, Then}} -> + {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} end, - ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1), ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), - {FileHdl, + {FileHdl, Offset /= OldOffset, State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}. sequence_lookup(Sequences, Q) -> @@ -874,7 +875,7 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- internal_fetch_body(Q, MarkDelivered, Advance, State) -> - case with_queue_head(Q, MarkDelivered, Advance, State) of + case queue_head(Q, MarkDelivered, Advance, State) of E = {ok, empty, _} -> E; {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} -> {Message, State2} = read_stored_message(StoreEntry, State1), @@ -882,7 +883,7 @@ internal_fetch_body(Q, MarkDelivered, Advance, State) -> end. internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> - case with_queue_head(Q, MarkDelivered, Advance, State) of + case queue_head(Q, MarkDelivered, Advance, State) of E = {ok, empty, _} -> E; {ok, AckTag, IsDelivered, #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent }, @@ -890,8 +891,8 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1} end. -with_queue_head(Q, MarkDelivered, Advance, - State = #dqstate { sequences = Sequences }) -> +queue_head(Q, MarkDelivered, Advance, + State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId -> @@ -913,9 +914,10 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, total_size = TotalSize }, State) -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {FileHdl, State1} = get_read_handle(File, Offset, State), + {FileHdl, SeekReq, State1} = + get_read_handle(File, Offset, TotalSize, State), {ok, {MsgBody, _IsPersistent, EncodedBodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), + read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq), Message = #basic_message {} = bin_to_msg(MsgBody), ok = if RefCount > 1 -> insert_into_cache(Message, EncodedBodySize, State1); @@ -1480,7 +1482,7 @@ close_file(File, State = #dqstate { read_file_handles = case dict:find(File, ReadHdls) of error -> State; - {ok, {Hdl, Then}} -> + {ok, {Hdl, _Offset, Then}} -> ok = file:close(Hdl), State #dqstate { read_file_handles = { dict:erase(File, ReadHdls), @@ -1867,10 +1869,17 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> KO -> KO end. -read_message_at_offset(FileHdl, Offset, TotalSize) -> +read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) -> TotalSizeWriteOkBytes = TotalSize + 1, - case file:position(FileHdl, {bof, Offset}) of - {ok, Offset} -> + SeekRes = case SeekReq of + true -> case file:position(FileHdl, {bof, Offset}) of + {ok, Offset} -> ok; + KO -> KO + end; + false -> ok + end, + case SeekRes of + ok -> case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, @@ -1884,9 +1893,9 @@ read_message_at_offset(FileHdl, Offset, TotalSize) -> ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> -> {ok, {MsgBody, true, BodySize}} end; - KO -> KO + KO1 -> KO1 end; - KO -> KO + KO2 -> KO2 end. scan_file_for_valid_messages(File) -> @@ -1931,7 +1940,8 @@ read_next_file_entry(FileHdl, Offset) -> {false, false} -> %% all good, let's continue case file:read(FileHdl, MsgIdBinSize) of {ok, <<MsgId:MsgIdBinSize/binary>>} -> - ExpectedAbsPos = Offset + TwoIntegers + TotalSize, + ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + + TotalSize - 1, case file:position(FileHdl, {cur, TotalSize - MsgIdBinSize} ) of |
