diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73bb6e1983..0c9995b978 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -434,8 +434,6 @@ publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> publish_delivered(true, Msg = #basic_message { guid = Guid, is_persistent = IsPersistent }, State = #vqstate { len = 0, - index_state = IndexState, - msg_store_clients = MSCState, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, @@ -446,24 +444,14 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, is_delivered = true, msg_on_disk = false, index_on_disk = false }, - {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + PA1 = record_pending_ack(MsgStatus1, PA), PCount1 = maybe_inc(PCount, IsPersistent1), - State1 = State #vqstate { msg_store_clients = MSCState1, - persistent_count = PCount1, - next_seq_id = SeqId + 1, + {SeqId, State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, - in_counter = InCount + 1 }, - {SeqId, - case MsgStatus1 #msg_status.msg_on_disk of - true -> {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(false, MsgStatus1, IndexState), - PA1 = dict:store(SeqId, {true, Guid}, PA), - State1 #vqstate { index_state = IndexState1, - pending_ack = PA1 }; - false -> PA1 = dict:store(SeqId, MsgStatus1, PA), - State1 #vqstate { pending_ack = PA1 } - end}. + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 }}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -524,13 +512,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, %% 4. If an ack is required, add something sensible to PA PA1 = case AckRequired of - true -> Entry = - case MsgOnDisk of - true -> {IsPersistent, Guid}; - false -> MsgStatus #msg_status { - is_delivered = true } - end, - dict:store(SeqId, Entry, PA); + true -> record_pending_ack(MsgStatus #msg_status { + is_delivered = true }, PA); false -> PA end, @@ -708,6 +691,15 @@ maybe_inc(N, false) -> N. maybe_dec(N, true ) -> N - 1; maybe_dec(N, false) -> N. +record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + AckEntry = case MsgOnDisk of + true -> {IsPersistent, Guid}; + false -> MsgStatus + end, + dict:store(SeqId, AckEntry, PA). + remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState }) -> |
