summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl42
1 files changed, 17 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73bb6e1983..0c9995b978 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -434,8 +434,6 @@ publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
publish_delivered(true, Msg = #basic_message { guid = Guid,
is_persistent = IsPersistent },
State = #vqstate { len = 0,
- index_state = IndexState,
- msg_store_clients = MSCState,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
@@ -446,24 +444,14 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
- {MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ PA1 = record_pending_ack(MsgStatus1, PA),
PCount1 = maybe_inc(PCount, IsPersistent1),
- State1 = State #vqstate { msg_store_clients = MSCState1,
- persistent_count = PCount1,
- next_seq_id = SeqId + 1,
+ {SeqId, State1 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
- in_counter = InCount + 1 },
- {SeqId,
- case MsgStatus1 #msg_status.msg_on_disk of
- true -> {#msg_status { index_on_disk = true }, IndexState1} =
- maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- PA1 = dict:store(SeqId, {true, Guid}, PA),
- State1 #vqstate { index_state = IndexState1,
- pending_ack = PA1 };
- false -> PA1 = dict:store(SeqId, MsgStatus1, PA),
- State1 #vqstate { pending_ack = PA1 }
- end}.
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1 }}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
@@ -524,13 +512,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
%% 4. If an ack is required, add something sensible to PA
PA1 = case AckRequired of
- true -> Entry =
- case MsgOnDisk of
- true -> {IsPersistent, Guid};
- false -> MsgStatus #msg_status {
- is_delivered = true }
- end,
- dict:store(SeqId, Entry, PA);
+ true -> record_pending_ack(MsgStatus #msg_status {
+ is_delivered = true }, PA);
false -> PA
end,
@@ -708,6 +691,15 @@ maybe_inc(N, false) -> N.
maybe_dec(N, true ) -> N - 1;
maybe_dec(N, false) -> N.
+record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ AckEntry = case MsgOnDisk of
+ true -> {IsPersistent, Guid};
+ false -> MsgStatus
+ end,
+ dict:store(SeqId, AckEntry, PA).
+
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState }) ->