diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0c9995b978..3a283bbf6e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -431,8 +431,7 @@ publish(Msg, State) -> publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid, - is_persistent = IsPersistent }, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -441,9 +440,8 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, - is_delivered = true, msg_on_disk = false, index_on_disk = false }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(MsgStatus1, PA), PCount1 = maybe_inc(PCount, IsPersistent1), @@ -532,12 +530,9 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, ack(AckTags, State) -> ack(fun (_AckEntry, State1) -> State1 end, AckTags, State). -tx_publish(Txn, - Msg = #basic_message { is_persistent = true, guid = Guid }, +tx_publish(Txn, Msg = #basic_message { is_persistent = true }, State = #vqstate { msg_store_clients = MSCState, durable = true }) -> - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, - is_delivered = false, msg_on_disk = false, index_on_disk = false }, + MsgStatus = msg_status(true, undefined, Msg), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), publish_in_tx(Txn, Msg), @@ -691,6 +686,11 @@ maybe_inc(N, false) -> N. maybe_dec(N, true ) -> N - 1; maybe_dec(N, false) -> N. +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> + #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, + is_persistent = IsPersistent, is_delivered = false, + msg_on_disk = false, index_on_disk = false }. + record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> @@ -1137,7 +1137,7 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, end end. -publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, +publish(Msg = #basic_message { is_persistent = IsPersistent }, IsDelivered, MsgOnDisk, State = #vqstate { next_seq_id = SeqId, len = Len, @@ -1145,10 +1145,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, persistent_count = PCount, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = false }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, PCount1 = maybe_inc(PCount, IsPersistent1), {SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus, State #vqstate { next_seq_id = SeqId + 1, |
