diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-20 18:44:24 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-20 18:44:24 +0000 |
| commit | b29eab1cfe77086e7c5dd5af84c226b7ef05d43a (patch) | |
| tree | 172033d6aeae36f11f78eb2e5ef848320ded1d9e | |
| parent | 2a6a784fd6088f01b6752fb94499aa539388836d (diff) | |
| download | rabbitmq-server-git-b29eab1cfe77086e7c5dd5af84c226b7ef05d43a.tar.gz | |
Also, switch to reading the journal in one go. We don't care about this for performance, but it's probably a simplification.
| -rw-r--r-- | src/rabbit_queue_index.erl | 71 |
1 files changed, 27 insertions, 44 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0dfbc1ffd7..b56a990791 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -619,12 +619,6 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_msg(0, _Hdl) -> {ok, <<>>}; -read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of - {ok, MsgBin} -> {ok, MsgBin}; - Else -> Else - end. - parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS>>, MsgBin) -> %% work around for binary data fragmentation. See @@ -735,10 +729,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State = #qistate { dir = Dir }) -> - case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of + Path = filename:join(Dir, ?JOURNAL_FILENAME), + case rabbit_file:is_file(Path) of true -> {JournalHdl, State1} = get_journal_handle(State), + Size = rabbit_file:file_size(Path), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1); + {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size), + parse_journal_entries(JournalBin, State1); false -> State end. @@ -762,42 +759,28 @@ recover_journal(State) -> end, Segments), State1 #qistate { segments = Segments1 }. -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of - %% Journal entry composed only of zeroes was probably - %% produced during a dirty shutdown so stop reading - {ok, <<0:?PUB_RECORD_SIZE_BYTES/unit:8>>} -> - State; - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary, - MsgSize:?EMBEDDED_SIZE_BITS>>} -> - case read_msg(MsgSize, Hdl) of - {ok, MsgBin} -> - IsPersistent = - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - load_journal_entries( - add_to_journal( - SeqId, {IsPersistent, Bin, MsgBin}, - State)); - _ErrOrEoF -> - State - end; - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. +parse_journal_entries(<<?DEL_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); + +parse_journal_entries(<<?ACK_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); +parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, + 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + State; +parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary, + Rest/binary>>, State) -> + IsPersistent = case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, + parse_journal_entries( + Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State)); +parse_journal_entries(_ErrOrEoF, State) -> + State. deliver_or_ack(_Kind, [], State) -> State; |
