diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-13 13:56:34 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-13 13:56:34 +0000 |
| commit | 3b3b8c4f86cb74a9251ad392dfb2e1fa7ee715f9 (patch) | |
| tree | d69c6a31d194fe7fb511188f5948a60fd78d3a77 | |
| parent | cce13da176ec2ccbb89b046b6f5352f253c1a990 (diff) | |
| download | rabbitmq-server-git-3b3b8c4f86cb74a9251ad392dfb2e1fa7ee715f9.tar.gz | |
Oops.
| -rw-r--r-- | src/rabbit_queue_index.erl | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f8d9508471..07ef02a6d1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -307,14 +307,15 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, State#qistate{unconfirmed_msg = UCM1}; {false, _} -> State end), - Body = create_pub_record_body(MsgOrId, MsgProps), + {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, - SeqId:?SEQ_BITS>>, Body]), - maybe_flush_journal(add_to_journal(SeqId, {IsPersistent, Body}, State1)). + SeqId:?SEQ_BITS>>, Bin, MsgBin]), + maybe_flush_journal( + add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -604,21 +605,22 @@ scan_segments(Fun, Acc, State) -> create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, size = Size }) -> + ExpiryBin = expiry_to_binary(Expiry), case MsgOrId of MsgId when is_binary(MsgId) -> - [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, - <<0:?EMBEDDED_SIZE_BITS>>]; + {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS, + 0:?EMBEDDED_SIZE_BITS>>, <<>>}; #basic_message{id = MsgId} -> MsgBin = term_to_binary(MsgOrId), MsgBinSize = size(MsgBin), - [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>, - <<MsgBinSize:?EMBEDDED_SIZE_BITS>>, MsgBin] + {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS, + MsgBinSize:?EMBEDDED_SIZE_BITS>>, MsgBin} end. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_msg(0, _Hdl) -> {ok, none}; +read_msg(0, _Hdl) -> {ok, <<>>}; read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of {ok, MsgBin} -> {ok, MsgBin}; Else -> Else @@ -635,7 +637,7 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, end, size = Size}, case MsgBin of - none -> {MsgId, Props}; + <<>> -> {MsgId, Props}; _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), {Msg, Props} end. @@ -897,11 +899,11 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {IsPersistent, Body} -> + {IsPersistent, Bin, MsgBin} -> file_handle_cache:append( Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Body]) + RelSeq:?REL_SEQ_BITS>>, Bin, MsgBin]) end, ok = case {Del, Ack} of {no_del, no_ack} -> |
