summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl1
-rw-r--r--src/rabbit_variable_queue.erl125
2 files changed, 77 insertions, 49 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8ad5558387..6ab370b2ab 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -250,7 +250,6 @@ init(Name, MsgStoreRecovered) ->
%% acks only go to the RAM journal as it doesn't matter if we
%% lose them. Also mark delivered if not clean shutdown. Also
%% find the number of unacked messages.
- AllSegs =
CleanShutdown = detect_clean_shutdown(Dir),
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8585e1392c..4a4ba999ff 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -254,6 +254,9 @@
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }).
+-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
+ count = 0,
+ end_seq_id = Z }).
%%----------------------------------------------------------------------------
%% Public API
@@ -268,11 +271,11 @@ init(QueueName, PersistentStore) ->
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
- Delta = case DeltaCount1 of
- 0 -> ?BLANK_DELTA;
- _ -> #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId }
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId }
end,
Now = now(),
State =
@@ -666,19 +669,39 @@ persistent_msg_ids(Pubs) ->
[MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
Obj #basic_message.is_persistent].
-betas_from_segment_entries(List, SeqIdLimit, TransientThreshold) ->
- bpqueue:from_list([{true,
- [#msg_status { msg = undefined,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true
- }
- || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
- SeqId < SeqIdLimit,
- (IsPersistent orelse SeqId >= TransientThreshold)]}]).
+betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
+ {Filtered, IndexState1} =
+ lists:foldr(
+ fun ({MsgId, SeqId, IsPersistent, IsDelivered},
+ {FilteredAcc, IndexStateAcc}) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true ->
+ IndexStateAcc1 =
+ case IsDelivered of
+ false -> rabbit_queue_index:write_delivered(
+ SeqId, IndexStateAcc);
+ true -> IndexStateAcc
+ end,
+ {FilteredAcc, rabbit_queue_index:write_acks(
+ [SeqId], IndexStateAcc1)};
+ false ->
+ case SeqId < SeqIdLimit of
+ true ->
+ {[#msg_status { msg = undefined,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ } | FilteredAcc],
+ IndexStateAcc};
+ false ->
+ {FilteredAcc, IndexStateAcc}
+ end
+ end
+ end, {[], IndexState}, List),
+ {bpqueue:from_list([{true, Filtered}]), IndexState1}.
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -693,10 +716,10 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
rabbit_binary_generator:ensure_content_encoded(Content)) }.
%% the first arg is the older delta
-combine_deltas(#delta { count = 0 }, #delta { count = 0 }) ->
+combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
?BLANK_DELTA;
-combine_deltas(#delta { count = 0 }, #delta { } = B) -> B;
-combine_deltas(#delta { } = A, #delta { count = 0 }) -> A;
+combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { } = B) -> B;
+combine_deltas(#delta { } = A, ?BLANK_DELTA_PATTERN(Y)) -> A;
combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow},
#delta { start_seq_id = SeqIdHigh, count = CountHigh,
end_seq_id = SeqIdEnd }) ->
@@ -748,14 +771,15 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId,
delete1(PersistentStore, TransientThreshold, NextSeqId, Count,
Delta1SeqId, IndexState1);
{List, IndexState1} ->
- Q = betas_from_segment_entries(List, Delta1SeqId,
- TransientThreshold),
- {QCount, IndexState2} =
+ {Q, IndexState2} =
+ betas_from_segment_entries(
+ List, Delta1SeqId, TransientThreshold, IndexState1),
+ {QCount, IndexState3} =
remove_queue_entries(
PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q, IndexState1),
+ Q, IndexState2),
delete1(PersistentStore, TransientThreshold, NextSeqId,
- Count + QCount, Delta1SeqId, IndexState2)
+ Count + QCount, Delta1SeqId, IndexState3)
end.
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
@@ -1132,14 +1156,14 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
{Qa, Reduction1, State #vqstate { index_state = IndexState1,
ram_index_count = RamIndexCount1 }}.
-maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) ->
+maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(
State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
target_ram_msg_count = TargetRamMsgCount,
- delta = #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd },
+ delta = Delta = #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd },
transient_threshold = TransientThreshold}) ->
case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of
true ->
@@ -1150,28 +1174,33 @@ maybe_deltas_to_betas(
%% really be holding all the betas in memory.
{List, IndexState1, Delta1SeqId} =
read_index_segment(DeltaSeqId, IndexState),
- State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. It
%% could be [] if we ignored every message in the segment
%% due to it being transient and below the threshold
- Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd, TransientThreshold),
- Q3b = bpqueue:join(Q3, Q3a),
- case DeltaCount - bpqueue:len(Q3a) of
+ {Q3a, IndexState2} =
+ betas_from_segment_entries(
+ List, DeltaSeqIdEnd, TransientThreshold, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2 },
+ case bpqueue:len(Q3a) of
0 ->
- %% delta is now empty, but it wasn't before, so
- %% can now join q2 onto q3
- State1 #vqstate { delta = ?BLANK_DELTA,
- q2 = bpqueue:new(),
- q3 = bpqueue:join(Q3b, Q2) };
- N when N > 0 ->
- State2 = State1 #vqstate {
- q3 = Q3b,
- delta = #delta { start_seq_id = Delta1SeqId,
- count = N,
- end_seq_id = DeltaSeqIdEnd } },
- case N == DeltaCount of
- true -> maybe_deltas_to_betas(State2);
- false -> State2
+ maybe_deltas_to_betas(
+ State #vqstate {
+ delta = Delta #delta { start_seq_id = Delta1SeqId }});
+ _ ->
+ Q3b = bpqueue:join(Q3, Q3a),
+ case DeltaCount - bpqueue:len(Q3a) of
+ 0 ->
+ %% delta is now empty, but it wasn't
+ %% before, so can now join q2 onto q3
+ State1 #vqstate { delta = ?BLANK_DELTA,
+ q2 = bpqueue:new(),
+ q3 = bpqueue:join(Q3b, Q2) };
+ N when N > 0 ->
+ State1 #vqstate {
+ q3 = Q3b,
+ delta = #delta { start_seq_id = Delta1SeqId,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd } }
end
end
end.