diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-13 13:40:24 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-13 13:40:24 +0000 |
| commit | cce13da176ec2ccbb89b046b6f5352f253c1a990 (patch) | |
| tree | b98399919240dc76b7e2a57c8919d8ef60ae4c29 /src | |
| parent | e861016a8e5b47500a615cfc9bbcfad85e5d36ba (diff) | |
| download | rabbitmq-server-git-cce13da176ec2ccbb89b046b6f5352f253c1a990.tar.gz | |
Simplify reading message body.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 86 |
1 files changed, 40 insertions, 46 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1c348c7f5b..f8d9508471 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -154,20 +154,20 @@ %% This is the size of the message record embedded in the queue %% index. If 0, the message can be found in the message store. --define(MSG_IN_INDEX_SIZE_BYTES, 4). --define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)). +-define(EMBEDDED_SIZE_BYTES, 4). +-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). +%% + 4 for size +-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)). -%% 16 bytes for md5sum + 8 for expiry + 4 for size --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES + - ?MSG_IN_INDEX_SIZE_BYTES)). %% + 2 for seq, bits and prefix -define(PUB_RECORD_PREFIX_BYTES, 2). --define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)). - %% ---- misc ---- --define(PUB, {_, _}). %% {IsPersistent, Body} +-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin} -define(READ_MODE, [binary, raw, read]). -define(WRITE_MODE, [write | ?READ_MODE]). @@ -511,9 +511,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{IsPersistent, Body}, Del, no_ack}, + fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack}, {SegmentAndDirtyCount, Bytes}) -> - {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body), + {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, Del, RelSeq, SegmentAndDirtyCount), Bytes + case IsPersistent of @@ -607,34 +607,25 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, case MsgOrId of MsgId when is_binary(MsgId) -> [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, - <<0:?MSG_IN_INDEX_SIZE_BITS>>]; + <<0:?EMBEDDED_SIZE_BITS>>]; #basic_message{id = MsgId} -> MsgBin = term_to_binary(MsgOrId), MsgBinSize = size(MsgBin), [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, - <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin] + <<MsgBinSize:?EMBEDDED_SIZE_BITS>>, MsgBin] end. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_pub_record_body(<<_:?MSG_ID_BITS, _:?EXPIRY_BITS, - _:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>> = Bin, - Hdl) -> - case IndexSize of - 0 -> {ok, [Bin]}; - _ -> case file_handle_cache:read(Hdl, IndexSize) of - {ok, MsgBin} -> {ok, [Bin, MsgBin]}; - Else -> Else - end - end. - -parse_pub_record_body(IoList) when is_list(IoList) -> - parse_pub_record_body(iolist_to_binary(IoList)); +read_msg(0, _Hdl) -> {ok, none}; +read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of + {ok, MsgBin} -> {ok, MsgBin}; + Else -> Else + end. parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, - Size:?SIZE_BITS, _:?MSG_IN_INDEX_SIZE_BITS, - Rest/binary>>) -> + Size:?SIZE_BITS>>, MsgBin) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, @@ -643,10 +634,10 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, X -> X end, size = Size}, - case Rest of - <<>> -> {ok, MsgId, Props}; - _ -> Msg = #basic_message{id = MsgId} = binary_to_term(Rest), - {ok, Msg, Props} + case MsgBin of + none -> {MsgId, Props}; + _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), + {Msg, Props} end. %%---------------------------------------------------------------------------- @@ -775,14 +766,15 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of + case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of %% Journal entry composed only of zeroes was probably %% produced during a dirty shutdown so stop reading - {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> + {ok, <<0:?PUB_RECORD_SIZE_BYTES/unit:8>>} -> State; - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - case read_pub_record_body(Bin, Hdl) of - {ok, Body} -> + {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS>>} -> + case read_msg(MsgSize, Hdl) of + {ok, MsgBin} -> IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; @@ -790,7 +782,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> end, load_journal_entries( add_to_journal( - SeqId, {IsPersistent, Body}, State)); + SeqId, {IsPersistent, Bin, MsgBin}, + State)); _ErrOrEoF -> State end; @@ -944,8 +937,8 @@ segment_entries_foldr(Fun, Init, {SegEntries, _UnackedCount} = load_segment(false, Segment), {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldr( - fun (RelSeq, {{IsPersistent, Body}, Del, Ack}, Acc) -> - {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body), + fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) -> + {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc) end, Init, SegEntries1). @@ -981,11 +974,12 @@ load_segment_entries(Hdl, KeepAcked, Acc) -> end. load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of - {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} -> - case read_pub_record_body(PubRecordBody, Hdl) of - {ok, Body} -> - Obj = {{IsPersistent, Body}, no_del, no_ack}, + case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of + {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS>>} -> + case read_msg(MsgSize, Hdl) of + {ok, MsgBin} -> + Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), {SegEntries1, Unacked + 1}; _ -> @@ -1225,7 +1219,7 @@ store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, Rest/binary>>) -> {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, - 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; + 0:?EMBEDDED_SIZE_BITS>>, Rest}; store_msg_journal(_) -> stop. @@ -1234,7 +1228,7 @@ store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, - 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; + 0:?EMBEDDED_SIZE_BITS>>, Rest}; store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, |
