summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-13 13:19:04 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-13 13:19:04 +0000
commite861016a8e5b47500a615cfc9bbcfad85e5d36ba (patch)
tree76bcf60bbc78e4fed8794f59916cb9fe0cd0333d /src
parent1ec1e79a2ecdaad03fa9335f74fbee3726eff79e (diff)
downloadrabbitmq-server-git-e861016a8e5b47500a615cfc9bbcfad85e5d36ba.tar.gz
Finish the job started by 2f668f7b457f; make sure we always hold messages in memory in the serialised form.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl58
1 files changed, 37 insertions, 21 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e36797a91a..1c348c7f5b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -34,7 +34,9 @@
%% metadata about the message; the delivery and acknowledgement
%% records just contain the sequence ID. A publish record may also
%% contain the complete message if provided to publish/5; this allows
-%% the message store to be avoided altogether for small messages.
+%% the message store to be avoided altogether for small messages. In
+%% either case the publish record is stored in memory in the same
+%% serialised format it will take on disk.
%%
%% Because of the fact that the queue can decide at any point to send
%% a queue entry to disk, you can not rely on publishes appearing in
@@ -91,7 +93,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{IsPersistent, Body}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -165,7 +167,7 @@
%% ---- misc ----
--define(PUB, {_, _}).
+-define(PUB, {_, _}). %% {IsPersistent, Body}
-define(READ_MODE, [binary, raw, read]).
-define(WRITE_MODE, [write | ?READ_MODE]).
@@ -509,9 +511,10 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack},
+ fun (RelSeq, {{IsPersistent, Body}, Del, no_ack},
{SegmentAndDirtyCount, Bytes}) ->
- {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body),
+ {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
Del, RelSeq, SegmentAndDirtyCount),
Bytes + case IsPersistent of
true -> MsgProps#message_properties.size;
@@ -615,9 +618,23 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>,
+read_pub_record_body(<<_:?MSG_ID_BITS, _:?EXPIRY_BITS,
+ _:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>> = Bin,
Hdl) ->
+ case IndexSize of
+ 0 -> {ok, [Bin]};
+ _ -> case file_handle_cache:read(Hdl, IndexSize) of
+ {ok, MsgBin} -> {ok, [Bin, MsgBin]};
+ Else -> Else
+ end
+ end.
+
+parse_pub_record_body(IoList) when is_list(IoList) ->
+ parse_pub_record_body(iolist_to_binary(IoList));
+
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS, _:?MSG_IN_INDEX_SIZE_BITS,
+ Rest/binary>>) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
@@ -626,14 +643,10 @@ read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
X -> X
end,
size = Size},
- case IndexSize of
- 0 -> {ok, MsgId, Props};
- _ -> case file_handle_cache:read(Hdl, IndexSize) of
- {ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
- binary_to_term(MsgBin),
- {ok, Msg, Props};
- Else -> Else
- end
+ case Rest of
+ <<>> -> {ok, MsgId, Props};
+ _ -> Msg = #basic_message{id = MsgId} = binary_to_term(Rest),
+ {ok, Msg, Props}
end.
%%----------------------------------------------------------------------------
@@ -769,16 +782,15 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
State;
{ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
case read_pub_record_body(Bin, Hdl) of
- {ok, MsgOrId, Props} ->
+ {ok, Body} ->
IsPersistent =
case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end,
- B = create_pub_record_body(MsgOrId, Props),
load_journal_entries(
add_to_journal(
- SeqId, {IsPersistent, B}, State));
+ SeqId, {IsPersistent, Body}, State));
_ErrOrEoF ->
State
end;
@@ -931,7 +943,11 @@ segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
{SegEntries, _UnackedCount} = load_segment(false, Segment),
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
- array:sparse_foldr(Fun, Init, SegEntries1).
+ array:sparse_foldr(
+ fun (RelSeq, {{IsPersistent, Body}, Del, Ack}, Acc) ->
+ {ok, MsgOrId, MsgProps} = parse_pub_record_body(Body),
+ Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc)
+ end, Init, SegEntries1).
%% Loading segments
%%
@@ -968,8 +984,8 @@ load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
{ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
case read_pub_record_body(PubRecordBody, Hdl) of
- {ok, MsgOrId, MsgProps} ->
- Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
+ {ok, Body} ->
+ Obj = {{IsPersistent, Body}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
{SegEntries1, Unacked + 1};
_ ->