diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-12 12:56:04 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-12 12:56:04 +0100 |
| commit | 267192debdade3bea25e50cf9e74efa8ee3a2337 (patch) | |
| tree | 241cf9b505cb00eb4ac27c2014a68c7369cc9e12 | |
| parent | 0fcd1609386fb7b487e00173099d23a118e616b3 (diff) | |
| download | rabbitmq-server-git-267192debdade3bea25e50cf9e74efa8ee3a2337.tar.gz | |
refactor: simplify publish_delivered
- extract recoding of pending acks
- use maybe_write_to_disk
| -rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73bb6e1983..0c9995b978 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -434,8 +434,6 @@ publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> publish_delivered(true, Msg = #basic_message { guid = Guid, is_persistent = IsPersistent }, State = #vqstate { len = 0, - index_state = IndexState, - msg_store_clients = MSCState, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, @@ -446,24 +444,14 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, is_delivered = true, msg_on_disk = false, index_on_disk = false }, - {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + PA1 = record_pending_ack(MsgStatus1, PA), PCount1 = maybe_inc(PCount, IsPersistent1), - State1 = State #vqstate { msg_store_clients = MSCState1, - persistent_count = PCount1, - next_seq_id = SeqId + 1, + {SeqId, State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, - in_counter = InCount + 1 }, - {SeqId, - case MsgStatus1 #msg_status.msg_on_disk of - true -> {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(false, MsgStatus1, IndexState), - PA1 = dict:store(SeqId, {true, Guid}, PA), - State1 #vqstate { index_state = IndexState1, - pending_ack = PA1 }; - false -> PA1 = dict:store(SeqId, MsgStatus1, PA), - State1 #vqstate { pending_ack = PA1 } - end}. + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 }}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -524,13 +512,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, %% 4. If an ack is required, add something sensible to PA PA1 = case AckRequired of - true -> Entry = - case MsgOnDisk of - true -> {IsPersistent, Guid}; - false -> MsgStatus #msg_status { - is_delivered = true } - end, - dict:store(SeqId, Entry, PA); + true -> record_pending_ack(MsgStatus #msg_status { + is_delivered = true }, PA); false -> PA end, @@ -708,6 +691,15 @@ maybe_inc(N, false) -> N. maybe_dec(N, true ) -> N - 1; maybe_dec(N, false) -> N. +record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + AckEntry = case MsgOnDisk of + true -> {IsPersistent, Guid}; + false -> MsgStatus + end, + dict:store(SeqId, AckEntry, PA). + remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState }) -> |
