diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index fc7624478a..2d7a8befde 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -964,12 +964,11 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) when Start + Count =< End -> Delta. -m(MsgStatus = #msg_status { msg = Msg, - is_persistent = IsPersistent, +m(MsgStatus = #msg_status { is_persistent = IsPersistent, msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }) -> true = (not IsPersistent) or IndexOnDisk, - true = (Msg =/= undefined) or MsgInStore, + true = msg_in_ram(MsgStatus) or MsgInStore, MsgStatus. one_if(true ) -> 1; @@ -1089,7 +1088,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1], RRC, RB}; false -> MsgStatus = m(beta_msg_status(M)), - HaveMsg = MsgStatus#msg_status.msg =/= undefined, + HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), case (gb_trees:is_defined(SeqId, RPA) orelse gb_trees:is_defined(SeqId, DPA) orelse @@ -1260,7 +1259,6 @@ msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, - msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_in_store = MsgInStore, @@ -1298,7 +1296,7 @@ remove(AckRequired, MsgStatus = #msg_status { end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + RamMsgCount1 = RamMsgCount - one_if(msg_in_ram(MsgStatus)), State2 = case AckRequired of false -> upd_bytes(-1, 0, MsgStatus, State1); true -> upd_bytes(-1, 1, MsgStatus, State1) @@ -1338,16 +1336,16 @@ remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec}, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, - msg_props = #message_properties { size = Size } }, + msg_props = #message_properties { size = Size } } = MsgStatus, {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) -> {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytesDec + Size * one_if(Msg =/= undefined), + RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)), PCountDec + one_if(IsPersistent), PBytesDec + Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1423,7 +1421,7 @@ prepare_to_store(Msg) -> %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg, +record_pending_ack(#msg_status { seq_id = SeqId, msg_props = MsgProps } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, @@ -1431,10 +1429,10 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg, ack_in_counter = AckInCount}) -> Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, {RPA1, DPA1, QPA1} = - case {Msg, persist_to(MsgProps)} of - {undefined, _} -> {RPA, Insert(DPA), QPA}; - {_, queue_index} -> {RPA, DPA, Insert(QPA)}; - {_, msg_store} -> {Insert(RPA), DPA, QPA} + case {msg_in_ram(MsgStatus), persist_to(MsgProps)} of + {false, _} -> {RPA, Insert(DPA), QPA}; + {_, queue_index} -> {RPA, DPA, Insert(QPA)}; + {_, msg_store} -> {Insert(RPA), DPA, QPA} end, State #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1, @@ -1937,10 +1935,10 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = case MsgStatus2#msg_status.msg of - undefined -> upd_ram_bytes_count( - -1, MsgStatus2, State1); - _ -> State1 + State2 = case msg_in_ram(MsgStatus2) of + false -> upd_ram_bytes_count( + -1, MsgStatus2, State1); + true -> State1 end, State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, @@ -1995,9 +1993,9 @@ push_betas_to_deltas1(Generator, Limit, Q, {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - State1 = case MsgStatus#msg_status.msg of - undefined -> State; - _ -> upd_ram_bytes_count(-1, MsgStatus, State) + State1 = case msg_in_ram(MsgStatus) of + false -> State; + true -> upd_ram_bytes_count(-1, MsgStatus, State) end, Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, |
