diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 125 |
2 files changed, 77 insertions, 49 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8ad5558387..6ab370b2ab 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -250,7 +250,6 @@ init(Name, MsgStoreRecovered) -> %% acks only go to the RAM journal as it doesn't matter if we %% lose them. Also mark delivered if not clean shutdown. Also %% find the number of unacked messages. - AllSegs = CleanShutdown = detect_clean_shutdown(Dir), %% We know the journal is empty here, so we don't need to combine %% with the journal, and we don't need to worry about messages diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8585e1392c..4a4ba999ff 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -254,6 +254,9 @@ -define(BLANK_DELTA, #delta { start_seq_id = undefined, count = 0, end_seq_id = undefined }). +-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, + count = 0, + end_seq_id = Z }). %%---------------------------------------------------------------------------- %% Public API @@ -268,11 +271,11 @@ init(QueueName, PersistentStore) -> rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), - Delta = case DeltaCount1 of - 0 -> ?BLANK_DELTA; - _ -> #delta { start_seq_id = DeltaSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } + Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of + true -> ?BLANK_DELTA; + false -> #delta { start_seq_id = DeltaSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId } end, Now = now(), State = @@ -666,19 +669,39 @@ persistent_msg_ids(Pubs) -> [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, Obj #basic_message.is_persistent]. -betas_from_segment_entries(List, SeqIdLimit, TransientThreshold) -> - bpqueue:from_list([{true, - [#msg_status { msg = undefined, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true - } - || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, - SeqId < SeqIdLimit, - (IsPersistent orelse SeqId >= TransientThreshold)]}]). +betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> + {Filtered, IndexState1} = + lists:foldr( + fun ({MsgId, SeqId, IsPersistent, IsDelivered}, + {FilteredAcc, IndexStateAcc}) -> + case SeqId < TransientThreshold andalso not IsPersistent of + true -> + IndexStateAcc1 = + case IsDelivered of + false -> rabbit_queue_index:write_delivered( + SeqId, IndexStateAcc); + true -> IndexStateAcc + end, + {FilteredAcc, rabbit_queue_index:write_acks( + [SeqId], IndexStateAcc1)}; + false -> + case SeqId < SeqIdLimit of + true -> + {[#msg_status { msg = undefined, + msg_id = MsgId, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + } | FilteredAcc], + IndexStateAcc}; + false -> + {FilteredAcc, IndexStateAcc} + end + end + end, {[], IndexState}, List), + {bpqueue:from_list([{true, Filtered}]), IndexState1}. read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), @@ -693,10 +716,10 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> rabbit_binary_generator:ensure_content_encoded(Content)) }. %% the first arg is the older delta -combine_deltas(#delta { count = 0 }, #delta { count = 0 }) -> +combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> ?BLANK_DELTA; -combine_deltas(#delta { count = 0 }, #delta { } = B) -> B; -combine_deltas(#delta { } = A, #delta { count = 0 }) -> A; +combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { } = B) -> B; +combine_deltas(#delta { } = A, ?BLANK_DELTA_PATTERN(Y)) -> A; combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow}, #delta { start_seq_id = SeqIdHigh, count = CountHigh, end_seq_id = SeqIdEnd }) -> @@ -748,14 +771,15 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId, delete1(PersistentStore, TransientThreshold, NextSeqId, Count, Delta1SeqId, IndexState1); {List, IndexState1} -> - Q = betas_from_segment_entries(List, Delta1SeqId, - TransientThreshold), - {QCount, IndexState2} = + {Q, IndexState2} = + betas_from_segment_entries( + List, Delta1SeqId, TransientThreshold, IndexState1), + {QCount, IndexState3} = remove_queue_entries( PersistentStore, fun beta_fold_no_index_on_disk/3, - Q, IndexState1), + Q, IndexState2), delete1(PersistentStore, TransientThreshold, NextSeqId, - Count + QCount, Delta1SeqId, IndexState2) + Count + QCount, Delta1SeqId, IndexState3) end. purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, @@ -1132,14 +1156,14 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State = {Qa, Reduction1, State #vqstate { index_state = IndexState1, ram_index_count = RamIndexCount1 }}. -maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) -> +maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; maybe_deltas_to_betas( State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, target_ram_msg_count = TargetRamMsgCount, - delta = #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd }, + delta = Delta = #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd }, transient_threshold = TransientThreshold}) -> case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of true -> @@ -1150,28 +1174,33 @@ maybe_deltas_to_betas( %% really be holding all the betas in memory. {List, IndexState1, Delta1SeqId} = read_index_segment(DeltaSeqId, IndexState), - State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. It %% could be [] if we ignored every message in the segment %% due to it being transient and below the threshold - Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd, TransientThreshold), - Q3b = bpqueue:join(Q3, Q3a), - case DeltaCount - bpqueue:len(Q3a) of + {Q3a, IndexState2} = + betas_from_segment_entries( + List, DeltaSeqIdEnd, TransientThreshold, IndexState1), + State1 = State #vqstate { index_state = IndexState2 }, + case bpqueue:len(Q3a) of 0 -> - %% delta is now empty, but it wasn't before, so - %% can now join q2 onto q3 - State1 #vqstate { delta = ?BLANK_DELTA, - q2 = bpqueue:new(), - q3 = bpqueue:join(Q3b, Q2) }; - N when N > 0 -> - State2 = State1 #vqstate { - q3 = Q3b, - delta = #delta { start_seq_id = Delta1SeqId, - count = N, - end_seq_id = DeltaSeqIdEnd } }, - case N == DeltaCount of - true -> maybe_deltas_to_betas(State2); - false -> State2 + maybe_deltas_to_betas( + State #vqstate { + delta = Delta #delta { start_seq_id = Delta1SeqId }}); + _ -> + Q3b = bpqueue:join(Q3, Q3a), + case DeltaCount - bpqueue:len(Q3a) of + 0 -> + %% delta is now empty, but it wasn't + %% before, so can now join q2 onto q3 + State1 #vqstate { delta = ?BLANK_DELTA, + q2 = bpqueue:new(), + q3 = bpqueue:join(Q3b, Q2) }; + N when N > 0 -> + State1 #vqstate { + q3 = Q3b, + delta = #delta { start_seq_id = Delta1SeqId, + count = N, + end_seq_id = DeltaSeqIdEnd } } end end end. |
