summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-03 15:52:49 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-03 15:52:49 +0000
commit13138e9a7c9a8ef29ce1ed092d3cd5289684a3af (patch)
tree83b8381df55df4d435639bc4e426865f89fad283 /src
parent19450507c7eed92dc8c4a8c264faadf797cf040c (diff)
downloadrabbitmq-server-git-13138e9a7c9a8ef29ce1ed092d3cd5289684a3af.tar.gz
Hacked-together implementation of persisting messages in the QI. Currently does it for all messages, but in reality we'd only want to do this for small ones (and make it configurable). Confirms are probably broken, maybe some other things.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl223
-rw-r--r--src/rabbit_variable_queue.erl56
2 files changed, 181 insertions, 98 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a2c88d441..90729e334c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -128,8 +128,8 @@
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
-%% of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
-define(PUB_PREFIX, 1).
-define(PUB_PREFIX_BITS, 1).
@@ -140,26 +140,37 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+%% This is the size of the message body content, for stats
-define(SIZE_BYTES, 4).
-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(MSG_IN_INDEX_SIZE_BYTES, 4).
+-define(MSG_IN_INDEX_SIZE_BITS, (?MSG_IN_INDEX_SIZE_BYTES * 8)).
+
%% 16 bytes for md5sum + 8 for expiry + 4 for size
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES +
+ ?MSG_IN_INDEX_SIZE_BYTES)).
%% + 2 for seq, bits and prefix
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
+-define(PUB_RECORD_PREFIX_BYTES, 2).
+
+-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + ?PUB_RECORD_PREFIX_BYTES)).
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
+%% %% 1 publish, 1 deliver, 1 ack per msg
+%% -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
+%% (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
%% ---- misc ----
-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
--define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(READ_AHEAD_MODE, ?READ_MODE).
-define(WRITE_MODE, [write | ?READ_MODE]).
+-define(READ_BUFFER_SIZE, 1048576). %% 1MB
+
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
@@ -219,7 +230,8 @@
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
- boolean(), boolean()}], qistate()}).
+ boolean(), boolean()}],
+ non_neg_integer(), non_neg_integer(), qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -267,9 +279,12 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed })
- when is_binary(MsgId) ->
+publish(MsgOrId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unconfirmed = Unconfirmed }) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
@@ -284,9 +299,9 @@ publish(MsgId, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ create_pub_record_body(MsgOrId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgOrId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -323,11 +338,12 @@ read(Start, End, State = #qistate { segments = Segments,
%% Start is inclusive, End is exclusive.
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
- {Messages, Segments1} =
+ {Messages, {BodiesRead, BodyBytesRead}, Segments1} =
lists:foldr(fun (Seg, Acc) ->
read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
- end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
- {Messages, State #qistate { segments = Segments1 }}.
+ end, {[], {0, 0}, Segments}, lists:seq(StartSeg, EndSeg)),
+ {Messages, BodiesRead, BodyBytesRead,
+ State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -541,7 +557,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
@@ -555,9 +572,9 @@ scan_segments(Fun, Acc, State) ->
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
@@ -568,24 +585,42 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size }) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<0:?MSG_IN_INDEX_SIZE_BITS>>];
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ MsgBinSize = size(MsgBin),
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>,
+ <<MsgBinSize:?MSG_IN_INDEX_SIZE_BITS>>, MsgBin]
+ end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS>>) ->
+read_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS, IndexSize:?MSG_IN_INDEX_SIZE_BITS>>,
+ Hdl) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties { expiry = Exp,
- size = Size }}.
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case IndexSize of
+ 0 -> {MsgId, Props};
+ _ -> case file_handle_cache:read(Hdl, IndexSize) of
+ {ok, MsgBin} -> Msg = #basic_message{id = MsgId} =
+ binary_to_term(MsgBin),
+ {Msg, Props};
+ _ -> exit(could_not_read)
+ end
+ end.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -719,14 +754,14 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
{ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
State;
{ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgId, MsgProps} = parse_pub_record_body(Bin),
+ {MsgOrId, Props} = read_pub_record_body(Bin, Hdl),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end,
load_journal_entries(
add_to_journal(
- SeqId, {MsgId, MsgProps, IsPersistent}, State));
+ SeqId, {MsgOrId, Props, IsPersistent}, State));
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -829,12 +864,22 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {MsgId, MsgProps, IsPersistent} ->
+ {MsgOrId, MsgProps, IsPersistent} ->
+ %% Body = create_pub_record_body(MsgOrId, MsgProps),
+ %% io:format("pub ~p~n",
+ %% [[{persist, IsPersistent},
+ %% {relseq, RelSeq},
+ %% {body, Body}]]),
+ %% io:format("write ~p~n",
+ %% [iolist_to_binary([<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ %% (bool_to_int(IsPersistent)):1,
+ %% RelSeq:?REL_SEQ_BITS>>,
+ %% Body])]),
file_handle_cache:append(
Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
+ create_pub_record_body(MsgOrId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -851,18 +896,21 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
Hdl.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
- {Messages, Segments}, Dir) ->
+ {Messages, BodyReadCounts, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
- {segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
- when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
- (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc ];
- (_RelSeq, _Value, Acc) ->
- Acc
- end, Messages, Segment),
- segment_store(Segment, Segments)}.
+ {Messages1, BodyReadCounts1} =
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ {Acc, BodyReadAcc})
+ when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
+ (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
+ {[{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc],
+ incr_body_read_counts(MsgOrId, MsgProps, BodyReadAcc)};
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, {Messages, BodyReadCounts}, Segment),
+ {Messages1, BodyReadCounts1, segment_store(Segment, Segments)}.
segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
@@ -870,6 +918,12 @@ segment_entries_foldr(Fun, Init,
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
array:sparse_foldr(Fun, Init, SegEntries1).
+incr_body_read_counts(MsgId, _MsgProps, Counts) when is_binary(MsgId) ->
+ Counts;
+incr_body_read_counts(#basic_message{}, #message_properties{size = Size},
+ {BodiesRead, BodyBytesRead}) ->
+ {BodiesRead + 1, BodyBytesRead + Size}.
+
%% Loading segments
%%
%% Does not do any combining with the journal at all.
@@ -877,44 +931,50 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(
+ Path, ?READ_AHEAD_MODE,
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
- {ok, SegData} -> load_segment_entries(
- KeepAcked, SegData, Empty);
- eof -> Empty
- end,
+ Res = load_segment_entries(Hdl, KeepAcked, Empty),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked,
- <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
- PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
- SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
-load_segment_entries(KeepAcked,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, SegData,
- {SegEntries1, UnackedCount + UnackedCountDelta});
-load_segment_entries(_KeepAcked, _SegData, Res) ->
- Res.
+load_segment_entries(Hdl, KeepAcked, Acc) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of
+ {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked,
+ load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc));
+ {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>} ->
+ load_segment_entries(
+ Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+ eof -> %% TODO or maybe _
+ Acc
+ end.
+
+load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ {ok, <<PubRecordBody:?PUB_RECORD_BODY_BYTES/binary>>} ->
+ {MsgOrId, MsgProps} = read_pub_record_body(PubRecordBody, Hdl),
+ Obj = {{MsgOrId, MsgProps, IsPersistent}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ {SegEntries1, Unacked + 1};
+ _ ->
+ {SegEntries, Unacked}
+ end.
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1124,6 +1184,8 @@ store_msg_size_segment(_) ->
%%----------------------------------------------------------------------------
+%% TODO here?
+
foreach_queue_index(Funs) ->
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
@@ -1157,7 +1219,8 @@ transform_file(Path, Fun) when is_function(Fun)->
[{write_buffer, infinity}]),
{ok, PathHdl} = file_handle_cache:open(
- Path, [{read_ahead, Size} | ?READ_MODE], []),
+ Path, [{read_ahead, Size} | ?READ_MODE],
+ [{read_buffer, ?READ_BUFFER_SIZE}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d076b534e6..41c556522e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -362,7 +362,7 @@
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
- msgs_on_disk :: gb_sets:set(),
+ msgs_on_disk :: gb_sets:set(), %% TODO fix confirms!
msg_indices_on_disk :: gb_sets:set(),
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
@@ -645,7 +645,7 @@ fetch(AckRequired, State) ->
%% at this point, so read it in.
{Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
- {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, State3}
end.
drop(AckRequired, State) ->
@@ -963,9 +963,19 @@ msg_status(IsPersistent, IsDelivered, SeqId,
index_on_disk = false,
msg_props = MsgProps}.
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg};
+
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
- msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -973,7 +983,7 @@ beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1002,21 +1012,21 @@ msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ ok %% rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:read(MsgId, MSCState1)
+ exit(nah) %% rabbit_msg_store:read(MsgId, MSCState1)
end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
- rabbit_msg_store:remove(MsgIds, MCSState1)
+ ok %% rabbit_msg_store:remove(MsgIds, MCSState1)
end).
msg_store_close_fds(MSCState, IsPersistent) ->
@@ -1038,7 +1048,7 @@ maybe_write_delivered(true, SeqId, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -1308,11 +1318,11 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
+ %% Msg1 = Msg #basic_message {
+ %% %% don't persist any recoverable decoded properties
+ %% content = rabbit_binary_parser:clear_decoded_content(
+ %% Msg #basic_message.content)},
+ %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
MsgStatus #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1322,6 +1332,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
@@ -1329,8 +1340,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
IndexState1 = rabbit_queue_index:publish(
- MsgId, SeqId, MsgProps, IsPersistent, IndexState),
+ Msg1, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1575,7 +1590,8 @@ next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ {List, _, _, IndexState1} =
+ rabbit_queue_index:read(SeqId, SeqId1, IndexState),
next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
@@ -1744,6 +1760,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
@@ -1753,11 +1771,13 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
+ {List, RamCountDelta, RamBytesDelta, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
RPA, DPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountDelta,
+ ram_bytes = RamBytes + RamBytesDelta },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being