summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-13 13:56:34 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-13 13:56:34 +0000
commit3b3b8c4f86cb74a9251ad392dfb2e1fa7ee715f9 (patch)
treed69c6a31d194fe7fb511188f5948a60fd78d3a77
parentcce13da176ec2ccbb89b046b6f5352f253c1a990 (diff)
downloadrabbitmq-server-git-3b3b8c4f86cb74a9251ad392dfb2e1fa7ee715f9.tar.gz
Oops.
-rw-r--r--src/rabbit_queue_index.erl24
1 files changed, 13 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f8d9508471..07ef02a6d1 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -307,14 +307,15 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent,
State#qistate{unconfirmed_msg = UCM1};
{false, _} -> State
end),
- Body = create_pub_record_body(MsgOrId, MsgProps),
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>, Body]),
- maybe_flush_journal(add_to_journal(SeqId, {IsPersistent, Body}, State1)).
+ SeqId:?SEQ_BITS>>, Bin, MsgBin]),
+ maybe_flush_journal(
+ add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -604,21 +605,22 @@ scan_segments(Fun, Acc, State) ->
create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
size = Size }) ->
+ ExpiryBin = expiry_to_binary(Expiry),
case MsgOrId of
MsgId when is_binary(MsgId) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
- <<0:?EMBEDDED_SIZE_BITS>>];
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, <<>>};
#basic_message{id = MsgId} ->
MsgBin = term_to_binary(MsgOrId),
MsgBinSize = size(MsgBin),
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
- <<MsgBinSize:?EMBEDDED_SIZE_BITS>>, MsgBin]
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS,
+ MsgBinSize:?EMBEDDED_SIZE_BITS>>, MsgBin}
end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_msg(0, _Hdl) -> {ok, none};
+read_msg(0, _Hdl) -> {ok, <<>>};
read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of
{ok, MsgBin} -> {ok, MsgBin};
Else -> Else
@@ -635,7 +637,7 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
end,
size = Size},
case MsgBin of
- none -> {MsgId, Props};
+ <<>> -> {MsgId, Props};
_ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin),
{Msg, Props}
end.
@@ -897,11 +899,11 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {IsPersistent, Body} ->
+ {IsPersistent, Bin, MsgBin} ->
file_handle_cache:append(
Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, Body])
+ RelSeq:?REL_SEQ_BITS>>, Bin, MsgBin])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->