summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl223
-rw-r--r--src/rabbit_variable_queue.erl56
2 files changed, 181 insertions, 98 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a2c88d441..90729e334c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -128,8 +128,8 @@
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
-%% of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
-define(PUB_PREFIX, 1).
-define(PUB_PREFIX_BITS, 1).
@@ -140,26 +140,37 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+%% This is the size of the message body content, for stats
-define(SIZE_BYTES, 4).
-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(MSG_IN_INDEX_SIZE_BYTES, 4).
+-define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)).
+
%% 16 bytes for md5sum + 8 for expiry + 4 for size
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES +
+ ?MSG_IN_INDEX_SIZE_BYTES)).
%% + 2 for seq, bits and prefix
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
+-define(PUB_RECORD_PREFIX_BYTES, 2).
+
+-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)).
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
+%% %% 1 publish, 1 deliver, 1 ack per msg
+%% -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
+%% (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
%% ---- misc ----
-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
--define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(READ_AHEAD_MODE, ?READ_MODE).
-define(WRITE_MODE, [write | ?READ_MODE]).
+-define(READ_BUFFER_SIZE, 1048576). %% 1MB
+
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
@@ -219,7 +230,8 @@
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
- boolean(), boolean()}], qistate()}).
+ boolean(), boolean()}],
+ non_neg_integer(), non_neg_integer(), qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -267,9 +279,12 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed })
- when is_binary(MsgId) ->
+publish(MsgOrId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unconfirmed = Unconfirmed }) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
@@ -284,9 +299,9 @@ publish(MsgId, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ create_pub_record_body(MsgOrId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgOrId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -323,11 +338,12 @@ read(Start, End, State = #qistate { segments = Segments,
%% Start is inclusive, End is exclusive.
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
- {Messages, Segments1} =
+ {Messages, {BodiesRead, BodyBytesRead}, Segments1} =
lists:foldr(fun (Seg, Acc) ->
read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
- end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
- {Messages, State #qistate { segments = Segments1 }}.
+ end, {[], {0, 0}, Segments}, lists:seq(StartSeg, EndSeg)),
+ {Messages, BodiesRead, BodyBytesRead,
+ State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -541,7 +557,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
@@ -555,9 +572,9 @@ scan_segments(Fun, Acc, State) ->
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
@@ -568,24 +585,42 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size }) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<0:?MSG_IN_INDEX_SIZE_BITS>>];
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ MsgBinSize = size(MsgBin),
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin]
+ end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS>>) ->
+read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>,
+ Hdl) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties { expiry = Exp,
- size = Size }}.
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case IndexSize of
+ 0 -> {MsgId, Props};
+ _ -> case file_handle_cache:read(Hdl, IndexSize) of
+ {ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
+ binary_to_term(MsgBin),
+ {Msg, Props};
+ _ -> exit(could_not_read)
+ end
+ end.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -719,14 +754,14 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
{ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
State;
{ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgId, MsgProps} = parse_pub_record_body(Bin),
+ {MsgOrId, Props} = read_pub_record_body(Bin, Hdl),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end,
load_journal_entries(
add_to_journal(
- SeqId, {MsgId, MsgProps, IsPersistent}, State));
+ SeqId, {MsgOrId, Props, IsPersistent}, State));
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -829,12 +864,22 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {MsgId, MsgProps, IsPersistent} ->
+ {MsgOrId, MsgProps, IsPersistent} ->
+ %% Body = create_pub_record_body(MsgOrId, MsgProps),
+ %% io:format("pub ~p~n",
+ %% [[{persist, IsPersistent},
+ %% {relseq, RelSeq},
+ %% {body, Body}]]),
+ %% io:format("write ~p~n",
+ %% [iolist_to_binary([<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ %% (bool_to_int(IsPersistent)):1,
+ %% RelSeq:?REL_SEQ_BITS>>,
+ %% Body])]),
file_handle_cache:append(
Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
+ create_pub_record_body(MsgOrId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -851,18 +896,21 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
Hdl.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
- {Messages, Segments}, Dir) ->
+ {Messages, BodyReadCounts, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
- {segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
- when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
- (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc ];
- (_RelSeq, _Value, Acc) ->
- Acc
- end, Messages, Segment),
- segment_store(Segment, Segments)}.
+ {Messages1, BodyReadCounts1} =
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ {Acc, BodyReadAcc})
+ when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
+ (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
+ {[{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc],
+ incr_body_read_counts(MsgOrId, MsgProps, BodyReadAcc)};
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, {Messages, BodyReadCounts}, Segment),
+ {Messages1, BodyReadCounts1, segment_store(Segment, Segments)}.
segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
@@ -870,6 +918,12 @@ segment_entries_foldr(Fun, Init,
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
array:sparse_foldr(Fun, Init, SegEntries1).
+incr_body_read_counts(MsgId, _MsgProps, Counts) when is_binary(MsgId) ->
+ Counts;
+incr_body_read_counts(#basic_message{}, #message_properties{size = Size},
+ {BodiesRead, BodyBytesRead}) ->
+ {BodiesRead + 1, BodyBytesRead + Size}.
+
%% Loading segments
%%
%% Does not do any combining with the journal at all.
@@ -877,44 +931,50 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(
+ Path, ?READ_AHEAD_MODE,
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
- {ok, SegData} -> load_segment_entries(
- KeepAcked, SegData, Empty);
- eof -> Empty
- end,
+ Res = load_segment_entries(Hdl, KeepAcked, Empty),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked,
- <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
- PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
- SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
-load_segment_entries(KeepAcked,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, SegData,
- {SegEntries1, UnackedCount + UnackedCountDelta});
-load_segment_entries(_KeepAcked, _SegData, Res) ->
- Res.
+load_segment_entries(Hdl, KeepAcked, Acc) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of
+ {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked,
+ load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc));
+ {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+ eof -> %% TODO or maybe _
+ Acc
+ end.
+
+load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
+ {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl),
+ Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ {SegEntries1, Unacked + 1};
+ _ ->
+ {SegEntries, Unacked}
+ end.
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1124,6 +1184,8 @@ store_msg_size_segment(_) ->
%%----------------------------------------------------------------------------
+%% TODO here?
+
foreach_queue_index(Funs) ->
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
@@ -1157,7 +1219,8 @@ transform_file(Path, Fun) when is_function(Fun)->
[{write_buffer, infinity}]),
{ok, PathHdl} = file_handle_cache:open(
- Path, [{read_ahead, Size} | ?READ_MODE], []),
+ Path, [{read_ahead, Size} | ?READ_MODE],
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d076b534e6..41c556522e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -362,7 +362,7 @@
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
- msgs_on_disk :: gb_sets:set(),
+ msgs_on_disk :: gb_sets:set(), %% TODO fix confirms!
msg_indices_on_disk :: gb_sets:set(),
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
@@ -645,7 +645,7 @@ fetch(AckRequired, State) ->
%% at this point, so read it in.
{Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
- {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, State3}
end.
drop(AckRequired, State) ->
@@ -963,9 +963,19 @@ msg_status(IsPersistent, IsDelivered, SeqId,
index_on_disk = false,
msg_props = MsgProps}.
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg};
+
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
- msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -973,7 +983,7 @@ beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1002,21 +1012,21 @@ msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ ok %% rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:read(MsgId, MSCState1)
+ exit(nah) %% rabbit_msg_store:read(MsgId, MSCState1)
end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
- rabbit_msg_store:remove(MsgIds, MCSState1)
+ ok %% rabbit_msg_store:remove(MsgIds, MCSState1)
end).
msg_store_close_fds(MSCState, IsPersistent) ->
@@ -1038,7 +1048,7 @@ maybe_write_delivered(true, SeqId, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -1308,11 +1318,11 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
+ %% Msg1 = Msg #basic_message {
+ %% %% don't persist any recoverable decoded properties
+ %% content = rabbit_binary_parser:clear_decoded_content(
+ %% Msg #basic_message.content)},
+ %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
MsgStatus #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1322,6 +1332,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
@@ -1329,8 +1340,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
IndexState1 = rabbit_queue_index:publish(
- MsgId, SeqId, MsgProps, IsPersistent, IndexState),
+ Msg1, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1575,7 +1590,8 @@ next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ {List, _, _, IndexState1} =
+ rabbit_queue_index:read(SeqId, SeqId1, IndexState),
next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
@@ -1744,6 +1760,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
@@ -1753,11 +1771,13 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
+ {List, RamCountDelta, RamBytesDelta, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
RPA, DPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountDelta,
+ ram_bytes = RamBytes + RamBytesDelta },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being