diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-13 13:19:04 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-13 13:19:04 +0000 |
| commit | e861016a8e5b47500a615cfc9bbcfad85e5d36ba (patch) | |
| tree | 76bcf60bbc78e4fed8794f59916cb9fe0cd0333d /src | |
| parent | 1ec1e79a2ecdaad03fa9335f74fbee3726eff79e (diff) | |
| download | rabbitmq-server-git-e861016a8e5b47500a615cfc9bbcfad85e5d36ba.tar.gz | |
Finish the job started by 2f668f7b457f; make sure we always hold messages in memory in the serialised form.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 58 |
1 files changed, 37 insertions, 21 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e36797a91a..1c348c7f5b 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -34,7 +34,9 @@ %% metadata about the message; the delivery and acknowledgement %% records just contain the sequence ID. A publish record may also %% contain the complete message if provided to publish/5; this allows -%% the message store to be avoided altogether for small messages. +%% the message store to be avoided altogether for small messages. In +%% either case the publish record is stored in memory in the same +%% serialised format it will take on disk. %% %% Because of the fact that the queue can decide at any point to send %% a queue entry to disk, you can not rely on publishes appearing in @@ -91,7 +93,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}), +%% the tuple: {('no_pub'|{IsPersistent, Body}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -165,7 +167,7 @@ %% ---- misc ---- --define(PUB, {_, _}). +-define(PUB, {_, _}). %% {IsPersistent, Body} -define(READ_MODE, [binary, raw, read]). -define(WRITE_MODE, [write | ?READ_MODE]). @@ -509,9 +511,10 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack}, + fun (RelSeq, {{IsPersistent, Body}, Del, no_ack}, {SegmentAndDirtyCount, Bytes}) -> - {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body), + {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, Del, RelSeq, SegmentAndDirtyCount), Bytes + case IsPersistent of true -> MsgProps#message_properties.size; @@ -615,9 +618,23 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, - Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>, +read_pub_record_body(<<_:?MSG_ID_BITS, _:?EXPIRY_BITS, + _:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>> = Bin, Hdl) -> + case IndexSize of + 0 -> {ok, [Bin]}; + _ -> case file_handle_cache:read(Hdl, IndexSize) of + {ok, MsgBin} -> {ok, [Bin, MsgBin]}; + Else -> Else + end + end. + +parse_pub_record_body(IoList) when is_list(IoList) -> + parse_pub_record_body(iolist_to_binary(IoList)); + +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS, _:?MSG_IN_INDEX_SIZE_BITS, + Rest/binary>>) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, @@ -626,14 +643,10 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, X -> X end, size = Size}, - case IndexSize of - 0 -> {ok, MsgId, Props}; - _ -> case file_handle_cache:read(Hdl, IndexSize) of - {ok, MsgBin} -> Msg = #basic_message{id = MsgId} = - binary_to_term(MsgBin), - {ok, Msg, Props}; - Else -> Else - end + case Rest of + <<>> -> {ok, MsgId, Props}; + _ -> Msg = #basic_message{id = MsgId} = binary_to_term(Rest), + {ok, Msg, Props} end. %%---------------------------------------------------------------------------- @@ -769,16 +782,15 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> State; {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> case read_pub_record_body(Bin, Hdl) of - {ok, MsgOrId, Props} -> + {ok, Body} -> IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end, - B = create_pub_record_body(MsgOrId, Props), load_journal_entries( add_to_journal( - SeqId, {IsPersistent, B}, State)); + SeqId, {IsPersistent, Body}, State)); _ErrOrEoF -> State end; @@ -931,7 +943,11 @@ segment_entries_foldr(Fun, Init, Segment = #segment { journal_entries = JEntries }) -> {SegEntries, _UnackedCount} = load_segment(false, Segment), {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), - array:sparse_foldr(Fun, Init, SegEntries1). + array:sparse_foldr( + fun (RelSeq, {{IsPersistent, Body}, Del, Ack}, Acc) -> + {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body), + Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc) + end, Init, SegEntries1). %% Loading segments %% @@ -968,8 +984,8 @@ load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} -> case read_pub_record_body(PubRecordBody, Hdl) of - {ok, MsgOrId, MsgProps} -> - Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack}, + {ok, Body} -> + Obj = {{IsPersistent, Body}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), {SegEntries1, Unacked + 1}; _ -> |
