summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-13 13:40:24 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-13 13:40:24 +0000
commitcce13da176ec2ccbb89b046b6f5352f253c1a990 (patch)
treeb98399919240dc76b7e2a57c8919d8ef60ae4c29 /src
parente861016a8e5b47500a615cfc9bbcfad85e5d36ba (diff)
downloadrabbitmq-server-git-cce13da176ec2ccbb89b046b6f5352f253c1a990.tar.gz
Simplify reading message body.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl86
1 files changed, 40 insertions, 46 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 1c348c7f5b..f8d9508471 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -154,20 +154,20 @@
%% This is the size of the message record embedded in the queue
%% index. If 0, the message can be found in the message store.
--define(MSG_IN_INDEX_SIZE_BYTES, 4).
--define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)).
+-define(EMBEDDED_SIZE_BYTES, 4).
+-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
+%% + 4 for size
+-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)).
-%% 16 bytes for md5sum + 8 for expiry + 4 for size
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES +
- ?MSG_IN_INDEX_SIZE_BYTES)).
%% + 2 for seq, bits and prefix
-define(PUB_RECORD_PREFIX_BYTES, 2).
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)).
-
%% ---- misc ----
--define(PUB, {_, _}). %% {IsPersistent, Body}
+-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin}
-define(READ_MODE, [binary, raw, read]).
-define(WRITE_MODE, [write | ?READ_MODE]).
@@ -511,9 +511,9 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{IsPersistent, Body}, Del, no_ack},
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack},
{SegmentAndDirtyCount, Bytes}) ->
- {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body),
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
{recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
Del, RelSeq, SegmentAndDirtyCount),
Bytes + case IsPersistent of
@@ -607,34 +607,25 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
case MsgOrId of
MsgId when is_binary(MsgId) ->
[MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
- <<0:?MSG_IN_INDEX_SIZE_BITS>>];
+ <<0:?EMBEDDED_SIZE_BITS>>];
#basic_message{id = MsgId} ->
MsgBin = term_to_binary(MsgOrId),
MsgBinSize = size(MsgBin),
[MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
- <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin]
+ <<MsgBinSize:?EMBEDDED_SIZE_BITS>>, MsgBin]
end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_pub_record_body(<<_:?MSG_ID_BITS, _:?EXPIRY_BITS,
- _:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>> = Bin,
- Hdl) ->
- case IndexSize of
- 0 -> {ok, [Bin]};
- _ -> case file_handle_cache:read(Hdl, IndexSize) of
- {ok, MsgBin} -> {ok, [Bin, MsgBin]};
- Else -> Else
- end
- end.
-
-parse_pub_record_body(IoList) when is_list(IoList) ->
- parse_pub_record_body(iolist_to_binary(IoList));
+read_msg(0, _Hdl) -> {ok, none};
+read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of
+ {ok, MsgBin} -> {ok, MsgBin};
+ Else -> Else
+ end.
parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS, _:?MSG_IN_INDEX_SIZE_BITS,
- Rest/binary>>) ->
+ Size:?SIZE_BITS>>, MsgBin) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
@@ -643,10 +634,10 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
X -> X
end,
size = Size},
- case Rest of
- <<>> -> {ok, MsgId, Props};
- _ -> Msg = #basic_message{id = MsgId} = binary_to_term(Rest),
- {ok, Msg, Props}
+ case MsgBin of
+ none -> {MsgId, Props};
+ _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin),
+ {Msg, Props}
end.
%%----------------------------------------------------------------------------
@@ -775,14 +766,15 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of
%% Journal entry composed only of zeroes was probably
%% produced during a dirty shutdown so stop reading
- {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
+ {ok, <<0:?PUB_RECORD_SIZE_BYTES/unit:8>>} ->
State;
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- case read_pub_record_body(Bin, Hdl) of
- {ok, Body} ->
+ {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS>>} ->
+ case read_msg(MsgSize, Hdl) of
+ {ok, MsgBin} ->
IsPersistent =
case Prefix of
?PUB_PERSIST_JPREFIX -> true;
@@ -790,7 +782,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
end,
load_journal_entries(
add_to_journal(
- SeqId, {IsPersistent, Body}, State));
+ SeqId, {IsPersistent, Bin, MsgBin},
+ State));
_ErrOrEoF ->
State
end;
@@ -944,8 +937,8 @@ segment_entries_foldr(Fun, Init,
{SegEntries, _UnackedCount} = load_segment(false, Segment),
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
array:sparse_foldr(
- fun (RelSeq, {{IsPersistent, Body}, Del, Ack}, Acc) ->
- {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body),
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) ->
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc)
end, Init, SegEntries1).
@@ -981,11 +974,12 @@ load_segment_entries(Hdl, KeepAcked, Acc) ->
end.
load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
- case read_pub_record_body(PubRecordBody, Hdl) of
- {ok, Body} ->
- Obj = {{IsPersistent, Body}, no_del, no_ack},
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of
+ {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS>>} ->
+ case read_msg(MsgSize, Hdl) of
+ {ok, MsgBin} ->
+ Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
{SegEntries1, Unacked + 1};
_ ->
@@ -1225,7 +1219,7 @@ store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
Rest/binary>>) ->
{<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
- 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest};
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
store_msg_journal(_) ->
stop.
@@ -1234,7 +1228,7 @@ store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
{<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
- 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest};
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,