summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-21 12:19:10 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-21 12:19:10 +0000
commit043dd23be06db4015aaf32ffb282cf162d92c8ca (patch)
treebb5ac7e77a8a3858b538503bf4464f7fc5402564
parent79b28c03bea34d1536344201cf2e839971eb7fe1 (diff)
parent0068d17d865761cadedf4de278664f78cccccff7 (diff)
downloadrabbitmq-server-git-043dd23be06db4015aaf32ffb282cf162d92c8ca.tar.gz
Merge bug26543
-rw-r--r--src/rabbit_queue_index.erl196
1 files changed, 87 insertions, 109 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 77d24f6f7f..676cb6e043 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -619,12 +619,6 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_msg(0, _Hdl) -> {ok, <<>>};
-read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of
- {ok, MsgBin} -> {ok, MsgBin};
- Else -> Else
- end.
-
parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
Size:?SIZE_BITS>>, MsgBin) ->
%% work around for binary data fragmentation. See
@@ -710,9 +704,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
+ _ -> Seg = array:sparse_foldr(
+ fun entry_to_segment/3, [], JEntries),
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
- array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
+ file_handle_cache:append(Hdl, Seg),
ok = file_handle_cache:close(Hdl),
Segment #segment { journal_entries = array_new() }
end.
@@ -731,10 +729,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State = #qistate { dir = Dir }) ->
- case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ case rabbit_file:is_file(Path) of
true -> {JournalHdl, State1} = get_journal_handle(State),
+ Size = rabbit_file:file_size(Path),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1);
+ {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size),
+ parse_journal_entries(JournalBin, State1);
false -> State
end.
@@ -758,42 +759,30 @@ recover_journal(State) ->
end, Segments),
State1 #qistate { segments = Segments1 }.
-load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
- case Prefix of
- ?DEL_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, del, State));
- ?ACK_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, ack, State));
- _ ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of
- %% Journal entry composed only of zeroes was probably
- %% produced during a dirty shutdown so stop reading
- {ok, <<0:?PUB_RECORD_SIZE_BYTES/unit:8>>} ->
- State;
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary,
- MsgSize:?EMBEDDED_SIZE_BITS>>} ->
- case read_msg(MsgSize, Hdl) of
- {ok, MsgBin} ->
- IsPersistent =
- case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end,
- load_journal_entries(
- add_to_journal(
- SeqId, {IsPersistent, Bin, MsgBin},
- State));
- _ErrOrEoF ->
- State
- end;
- _ErrOrEoF -> %% err, we've lost at least a publish
- State
- end
- end;
- _ErrOrEoF -> State
- end.
+parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
+
+parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
+parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
+ 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ State;
+parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary,
+ Rest/binary>>, State) ->
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
+ parse_journal_entries(
+ Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State));
+parse_journal_entries(_ErrOrEoF, State) ->
+ State.
deliver_or_ack(_Kind, [], State) ->
State;
@@ -893,34 +882,31 @@ segment_nums({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) ->
- Hdl;
-write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
- ok = case Pub of
- no_pub ->
- ok;
- {IsPersistent, Bin, MsgBin} ->
- file_handle_cache_stats:update(queue_index_write),
- file_handle_cache:append(
- Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, Bin/binary,
- (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin])
- end,
- ok = case {Del, Ack} of
- {no_del, no_ack} ->
- ok;
- _ ->
- Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- file_handle_cache_stats:update(queue_index_write),
- file_handle_cache:append(
- Hdl, case {Del, Ack} of
- {del, ack} -> [Binary, Binary];
- _ -> Binary
- end)
- end,
- Hdl.
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
+ Buf;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
+ %% NB: we are assembling the segment in reverse order here, so
+ %% del/ack comes first.
+ Buf1 = case {Del, Ack} of
+ {no_del, no_ack} ->
+ Buf;
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ case {Del, Ack} of
+ {del, ack} -> [[Binary, Binary] | Buf];
+ _ -> [Binary | Buf]
+ end
+ end,
+ case Pub of
+ no_pub ->
+ Buf1;
+ {IsPersistent, Bin, MsgBin} ->
+ [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1]
+ end.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
@@ -954,46 +940,38 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(
- Path, ?READ_MODE,
- [{read_buffer, ?READ_BUFFER_SIZE}]),
+ true -> Size = rabbit_file:file_size(Path),
+ file_handle_cache_stats:update(queue_index_read),
+ {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = load_segment_entries(Hdl, KeepAcked, Empty),
+ {ok, SegBin} = file_handle_cache:read(Hdl, Size),
ok = file_handle_cache:close(Hdl),
+ Res = parse_segment_entries(SegBin, KeepAcked, Empty),
Res
end.
-load_segment_entries(Hdl, KeepAcked, Acc) ->
- file_handle_cache_stats:update(queue_index_read),
- case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of
- {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- load_segment_entries(
- Hdl, KeepAcked,
- load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc));
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
- load_segment_entries(
- Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
- eof -> %% TODO or maybe _
- Acc
- end.
-
-load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary,
- MsgSize:?EMBEDDED_SIZE_BITS>>} ->
- case read_msg(MsgSize, Hdl) of
- {ok, MsgBin} ->
- Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- {SegEntries1, Unacked + 1};
- _ ->
- {SegEntries, Unacked}
- end;
- _ ->
- {SegEntries, Unacked}
- end.
+parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>,
+ KeepAcked, Acc) ->
+ parse_segment_publish_entry(
+ Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc);
+parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) ->
+ parse_segment_entries(
+ Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+parse_segment_entries(<<>>, _KeepAcked, Acc) ->
+ Acc.
+
+parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS,
+ MsgBin:MsgSize/binary, Rest/binary>>,
+ IsPersistent, RelSeq, KeepAcked,
+ {SegEntries, Unacked}) ->
+ Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1});
+parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) ->
+ parse_segment_entries(Rest, KeepAcked, Acc).
add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
case array:get(RelSeq, SegEntries) of