diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 75 |
1 files changed, 43 insertions, 32 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d399e82010..00ffef2044 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1014,10 +1014,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk} of - {false, true, false} -> Rem(), IndexState1; - {false, true, true} -> Rem(), Ack(); - _ -> IndexState1 + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of + {false, true, false, _, _} -> Rem(), IndexState1; + {false, true, true, _, _} -> Rem(), Ack(); + { true, true, true, false, false} -> Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1187,7 +1188,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {MsgIdsByStore, _AllMsgIds} = + {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1199,7 +1200,7 @@ remove_pending_ack(KeepPersistent, State1 end; false -> IndexState1 = - rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState), + rabbit_queue_index:ack(PersistentSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } @@ -1208,7 +1209,7 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{MsgIdsByStore, AllMsgIds}, + {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1223,7 +1224,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(AckTags, IndexState), + IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( @@ -1233,17 +1234,18 @@ ack(MsgStoreFun, Fun, AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {orddict:new(), []}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false, msg_id = MsgId }, - {MsgIdsByStore, AllMsgIds}) -> - {MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, - {MsgIdsByStore, AllMsgIds}) -> - {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; +accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> @@ -1348,29 +1350,34 @@ requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA, publish_r(MsgStatus = #msg_status { seq_id = SeqId, msg = Msg, - index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, - q4 = Q4, + index_on_disk = IndexOnDisk, + msg_on_disk = MsgOnDisk }, + State = #vqstate { q4 = Q4, delta = Delta, len = Len, in_counter = InCounter, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }) -> (case pick_store(SeqId, State) of - q4 -> case Msg of - undefined -> - {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - State1 #vqstate { q4 = gb_trees:insert( - SeqId, MsgStatus1, Q4a)}; - #basic_message{} -> - State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, - Q4), - ram_msg_count = RamMsgCount + 1 } - end; - q3 -> State #vqstate { - q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk), + q4 -> case Msg of + undefined -> + {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, State), + State1 #vqstate { q4 = gb_trees:insert( + SeqId, MsgStatus1, Q4a)}; + #basic_message{} -> + State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, Q4), + ram_msg_count = RamMsgCount + 1 } + end; + q3 -> %% make sure index is on disk + {#msg_status { index_on_disk = IndexOnDisk1 } = MsgStatus1, + #vqstate { q3 = Q3 } = State1} = + maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), + State1 #vqstate { + q3 = q3tree:in_r(IndexOnDisk1, + MsgStatus1, + Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }; delta -> #delta { start_seq_id = StartSeqId, @@ -1379,7 +1386,11 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId, Delta1 = Delta #delta { start_seq_id = min(SeqId, StartSeqId), count = Count + 1, end_seq_id = max(SeqId + 1, EndSeqId)}, - State #vqstate { delta = Delta1 } + %% make sure the index and msg are on disk + {_MsgStatus, State1} = maybe_write_to_disk( + not MsgOnDisk, not IndexOnDisk, + MsgStatus, State), + State1 #vqstate { delta = Delta1 } end) #vqstate { len = Len + 1, in_counter = InCounter + 1 }. |
