summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-20 18:44:24 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-20 18:44:24 +0000
commitb29eab1cfe77086e7c5dd5af84c226b7ef05d43a (patch)
tree172033d6aeae36f11f78eb2e5ef848320ded1d9e
parent2a6a784fd6088f01b6752fb94499aa539388836d (diff)
downloadrabbitmq-server-git-b29eab1cfe77086e7c5dd5af84c226b7ef05d43a.tar.gz
Also, switch to reading the journal in one go. We don't care about this for performance, but it's probably a simplification.
-rw-r--r--src/rabbit_queue_index.erl71
1 files changed, 27 insertions, 44 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0dfbc1ffd7..b56a990791 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -619,12 +619,6 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_msg(0, _Hdl) -> {ok, <<>>};
-read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of
- {ok, MsgBin} -> {ok, MsgBin};
- Else -> Else
- end.
-
parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
Size:?SIZE_BITS>>, MsgBin) ->
%% work around for binary data fragmentation. See
@@ -735,10 +729,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State = #qistate { dir = Dir }) ->
- case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ case rabbit_file:is_file(Path) of
true -> {JournalHdl, State1} = get_journal_handle(State),
+ Size = rabbit_file:file_size(Path),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1);
+ {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size),
+ parse_journal_entries(JournalBin, State1);
false -> State
end.
@@ -762,42 +759,28 @@ recover_journal(State) ->
end, Segments),
State1 #qistate { segments = Segments1 }.
-load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
- case Prefix of
- ?DEL_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, del, State));
- ?ACK_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, ack, State));
- _ ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of
- %% Journal entry composed only of zeroes was probably
- %% produced during a dirty shutdown so stop reading
- {ok, <<0:?PUB_RECORD_SIZE_BYTES/unit:8>>} ->
- State;
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary,
- MsgSize:?EMBEDDED_SIZE_BITS>>} ->
- case read_msg(MsgSize, Hdl) of
- {ok, MsgBin} ->
- IsPersistent =
- case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end,
- load_journal_entries(
- add_to_journal(
- SeqId, {IsPersistent, Bin, MsgBin},
- State));
- _ErrOrEoF ->
- State
- end;
- _ErrOrEoF -> %% err, we've lost at least a publish
- State
- end
- end;
- _ErrOrEoF -> State
- end.
+parse_journal_entries(<<?DEL_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
+
+parse_journal_entries(<<?ACK_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
+parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
+ 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ State;
+parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary,
+ Rest/binary>>, State) ->
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
+ parse_journal_entries(
+ Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State));
+parse_journal_entries(_ErrOrEoF, State) ->
+ State.
deliver_or_ack(_Kind, [], State) ->
State;