summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl56
1 files changed, 33 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4b8759f8f9..d19469d682 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -754,7 +754,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
-get_read_handle(File, Offset, State =
+get_read_handle(File, Offset, TotalSize, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
@@ -766,7 +766,8 @@ get_read_handle(File, Offset, State =
true -> State
end,
Now = now(),
- {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} =
case dict:find(File, ReadHdls) of
error ->
{ok, Hdl} = file:open(form_filename(File),
@@ -774,21 +775,21 @@ get_read_handle(File, Offset, State =
read_ahead]),
case dict:size(ReadHdls) < ReadFileHandlesLimit of
true ->
- {Hdl, ReadHdls, ReadHdlsAge};
- _False ->
+ {Hdl, 0, ReadHdls, ReadHdlsAge};
+ false ->
{Then, OldFile, ReadHdlsAge2} =
gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, Then}} =
+ {ok, {OldHdl, _Offset, Then}} =
dict:find(OldFile, ReadHdls),
ok = file:close(OldHdl),
- {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
end;
- {ok, {Hdl, Then}} ->
- {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ {ok, {Hdl, OldOffset1, Then}} ->
+ {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
end,
- ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1),
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl,
+ {FileHdl, Offset /= OldOffset,
State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}.
sequence_lookup(Sequences, Q) ->
@@ -874,7 +875,7 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
internal_fetch_body(Q, MarkDelivered, Advance, State) ->
- case with_queue_head(Q, MarkDelivered, Advance, State) of
+ case queue_head(Q, MarkDelivered, Advance, State) of
E = {ok, empty, _} -> E;
{ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} ->
{Message, State2} = read_stored_message(StoreEntry, State1),
@@ -882,7 +883,7 @@ internal_fetch_body(Q, MarkDelivered, Advance, State) ->
end.
internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
- case with_queue_head(Q, MarkDelivered, Advance, State) of
+ case queue_head(Q, MarkDelivered, Advance, State) of
E = {ok, empty, _} -> E;
{ok, AckTag, IsDelivered,
#message_store_entry { msg_id = MsgId, is_persistent = IsPersistent },
@@ -890,8 +891,8 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
{ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1}
end.
-with_queue_head(Q, MarkDelivered, Advance,
- State = #dqstate { sequences = Sequences }) ->
+queue_head(Q, MarkDelivered, Advance,
+ State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
{ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
@@ -913,9 +914,10 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
total_size = TotalSize }, State) ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {FileHdl, State1} = get_read_handle(File, Offset, State),
+ {FileHdl, SeekReq, State1} =
+ get_read_handle(File, Offset, TotalSize, State),
{ok, {MsgBody, _IsPersistent, EncodedBodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
+ read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
insert_into_cache(Message, EncodedBodySize, State1);
@@ -1480,7 +1482,7 @@ close_file(File, State = #dqstate { read_file_handles =
case dict:find(File, ReadHdls) of
error ->
State;
- {ok, {Hdl, Then}} ->
+ {ok, {Hdl, _Offset, Then}} ->
ok = file:close(Hdl),
State #dqstate { read_file_handles =
{ dict:erase(File, ReadHdls),
@@ -1867,10 +1869,17 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
KO -> KO
end.
-read_message_at_offset(FileHdl, Offset, TotalSize) ->
+read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) ->
TotalSizeWriteOkBytes = TotalSize + 1,
- case file:position(FileHdl, {bof, Offset}) of
- {ok, Offset} ->
+ SeekRes = case SeekReq of
+ true -> case file:position(FileHdl, {bof, Offset}) of
+ {ok, Offset} -> ok;
+ KO -> KO
+ end;
+ false -> ok
+ end,
+ case SeekRes of
+ ok ->
case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
{ok, <<TotalSize:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
@@ -1884,9 +1893,9 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
{ok, {MsgBody, true, BodySize}}
end;
- KO -> KO
+ KO1 -> KO1
end;
- KO -> KO
+ KO2 -> KO2
end.
scan_file_for_valid_messages(File) ->
@@ -1931,7 +1940,8 @@ read_next_file_entry(FileHdl, Offset) ->
{false, false} -> %% all good, let's continue
case file:read(FileHdl, MsgIdBinSize) of
{ok, <<MsgId:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
+ ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT +
+ TotalSize - 1,
case file:position(FileHdl,
{cur, TotalSize - MsgIdBinSize}
) of