summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl40
1 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index fc7624478a..2d7a8befde 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -964,12 +964,11 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
when Start + Count =< End ->
Delta.
-m(MsgStatus = #msg_status { msg = Msg,
- is_persistent = IsPersistent,
+m(MsgStatus = #msg_status { is_persistent = IsPersistent,
msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk }) ->
true = (not IsPersistent) or IndexOnDisk,
- true = (Msg =/= undefined) or MsgInStore,
+ true = msg_in_ram(MsgStatus) or MsgInStore,
MsgStatus.
one_if(true ) -> 1;
@@ -1089,7 +1088,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1], RRC, RB};
false -> MsgStatus = m(beta_msg_status(M)),
- HaveMsg = MsgStatus#msg_status.msg =/= undefined,
+ HaveMsg = msg_in_ram(MsgStatus),
Size = msg_size(MsgStatus),
case (gb_trees:is_defined(SeqId, RPA) orelse
gb_trees:is_defined(SeqId, DPA) orelse
@@ -1260,7 +1259,6 @@ msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
- msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_in_store = MsgInStore,
@@ -1298,7 +1296,7 @@ remove(AckRequired, MsgStatus = #msg_status {
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+ RamMsgCount1 = RamMsgCount - one_if(msg_in_ram(MsgStatus)),
State2 = case AckRequired of
false -> upd_bytes(-1, 0, MsgStatus, State1);
true -> upd_bytes(-1, 1, MsgStatus, State1)
@@ -1338,16 +1336,16 @@ remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec},
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
+ #msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
- msg_props = #message_properties { size = Size } },
+ msg_props = #message_properties { size = Size } } = MsgStatus,
{MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) ->
{case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytesDec + Size * one_if(Msg =/= undefined),
+ RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)),
PCountDec + one_if(IsPersistent),
PBytesDec + Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -1423,7 +1421,7 @@ prepare_to_store(Msg) ->
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg,
+record_pending_ack(#msg_status { seq_id = SeqId,
msg_props = MsgProps } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
@@ -1431,10 +1429,10 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg,
ack_in_counter = AckInCount}) ->
Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
{RPA1, DPA1, QPA1} =
- case {Msg, persist_to(MsgProps)} of
- {undefined, _} -> {RPA, Insert(DPA), QPA};
- {_, queue_index} -> {RPA, DPA, Insert(QPA)};
- {_, msg_store} -> {Insert(RPA), DPA, QPA}
+ case {msg_in_ram(MsgStatus), persist_to(MsgProps)} of
+ {false, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
end,
State #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1,
@@ -1937,10 +1935,10 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = case MsgStatus2#msg_status.msg of
- undefined -> upd_ram_bytes_count(
- -1, MsgStatus2, State1);
- _ -> State1
+ State2 = case msg_in_ram(MsgStatus2) of
+ false -> upd_ram_bytes_count(
+ -1, MsgStatus2, State1);
+ true -> State1
end,
State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
@@ -1995,9 +1993,9 @@ push_betas_to_deltas1(Generator, Limit, Q,
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
- State1 = case MsgStatus#msg_status.msg of
- undefined -> State;
- _ -> upd_ram_bytes_count(-1, MsgStatus, State)
+ State1 = case msg_in_ram(MsgStatus) of
+ false -> State;
+ true -> upd_ram_bytes_count(-1, MsgStatus, State)
end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,