summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-06 00:18:05 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-06 00:18:05 +0100
commit02c232663d4059c5a489523f027070b6d1650bb2 (patch)
treeac431fac317b910ceb6440ac76a95eb39063f0c0
parent69ac8068271a48be671699a330d12fd74c7f2779 (diff)
downloadrabbitmq-server-git-02c232663d4059c5a489523f027070b6d1650bb2.tar.gz
Fixed a leak - if we have many queue index segments which were written to disk but contained only references to transient messages then on restart, they would never be removed. Unfortunately, this does potentially introduce further work on startup (the queue will try to ensure that it loads the persistent contents of the first segment which contains a persistent message, which can involve scanning through a lot of segments. However, this is actually pretty quick. The only way to fix this would be to keep per segment counters of persistent messages and ensure that's written to disk too, but the extra code cost and complexity may make this just not worth it.
-rw-r--r--src/rabbit_queue_index.erl1
-rw-r--r--src/rabbit_variable_queue.erl125
2 files changed, 77 insertions, 49 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8ad5558387..6ab370b2ab 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -250,7 +250,6 @@ init(Name, MsgStoreRecovered) ->
%% acks only go to the RAM journal as it doesn't matter if we
%% lose them. Also mark delivered if not clean shutdown. Also
%% find the number of unacked messages.
- AllSegs =
CleanShutdown = detect_clean_shutdown(Dir),
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8585e1392c..4a4ba999ff 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -254,6 +254,9 @@
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }).
+-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
+ count = 0,
+ end_seq_id = Z }).
%%----------------------------------------------------------------------------
%% Public API
@@ -268,11 +271,11 @@ init(QueueName, PersistentStore) ->
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
- Delta = case DeltaCount1 of
- 0 -> ?BLANK_DELTA;
- _ -> #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId }
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId }
end,
Now = now(),
State =
@@ -666,19 +669,39 @@ persistent_msg_ids(Pubs) ->
[MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
Obj #basic_message.is_persistent].
-betas_from_segment_entries(List, SeqIdLimit, TransientThreshold) ->
- bpqueue:from_list([{true,
- [#msg_status { msg = undefined,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true
- }
- || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
- SeqId < SeqIdLimit,
- (IsPersistent orelse SeqId >= TransientThreshold)]}]).
+betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
+ {Filtered, IndexState1} =
+ lists:foldr(
+ fun ({MsgId, SeqId, IsPersistent, IsDelivered},
+ {FilteredAcc, IndexStateAcc}) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true ->
+ IndexStateAcc1 =
+ case IsDelivered of
+ false -> rabbit_queue_index:write_delivered(
+ SeqId, IndexStateAcc);
+ true -> IndexStateAcc
+ end,
+ {FilteredAcc, rabbit_queue_index:write_acks(
+ [SeqId], IndexStateAcc1)};
+ false ->
+ case SeqId < SeqIdLimit of
+ true ->
+ {[#msg_status { msg = undefined,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ } | FilteredAcc],
+ IndexStateAcc};
+ false ->
+ {FilteredAcc, IndexStateAcc}
+ end
+ end
+ end, {[], IndexState}, List),
+ {bpqueue:from_list([{true, Filtered}]), IndexState1}.
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -693,10 +716,10 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
rabbit_binary_generator:ensure_content_encoded(Content)) }.
%% the first arg is the older delta
-combine_deltas(#delta { count = 0 }, #delta { count = 0 }) ->
+combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
?BLANK_DELTA;
-combine_deltas(#delta { count = 0 }, #delta { } = B) -> B;
-combine_deltas(#delta { } = A, #delta { count = 0 }) -> A;
+combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { } = B) -> B;
+combine_deltas(#delta { } = A, ?BLANK_DELTA_PATTERN(Y)) -> A;
combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow},
#delta { start_seq_id = SeqIdHigh, count = CountHigh,
end_seq_id = SeqIdEnd }) ->
@@ -748,14 +771,15 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId,
delete1(PersistentStore, TransientThreshold, NextSeqId, Count,
Delta1SeqId, IndexState1);
{List, IndexState1} ->
- Q = betas_from_segment_entries(List, Delta1SeqId,
- TransientThreshold),
- {QCount, IndexState2} =
+ {Q, IndexState2} =
+ betas_from_segment_entries(
+ List, Delta1SeqId, TransientThreshold, IndexState1),
+ {QCount, IndexState3} =
remove_queue_entries(
PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q, IndexState1),
+ Q, IndexState2),
delete1(PersistentStore, TransientThreshold, NextSeqId,
- Count + QCount, Delta1SeqId, IndexState2)
+ Count + QCount, Delta1SeqId, IndexState3)
end.
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
@@ -1132,14 +1156,14 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
{Qa, Reduction1, State #vqstate { index_state = IndexState1,
ram_index_count = RamIndexCount1 }}.
-maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) ->
+maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(
State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
target_ram_msg_count = TargetRamMsgCount,
- delta = #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd },
+ delta = Delta = #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd },
transient_threshold = TransientThreshold}) ->
case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of
true ->
@@ -1150,28 +1174,33 @@ maybe_deltas_to_betas(
%% really be holding all the betas in memory.
{List, IndexState1, Delta1SeqId} =
read_index_segment(DeltaSeqId, IndexState),
- State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. It
%% could be [] if we ignored every message in the segment
%% due to it being transient and below the threshold
- Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd, TransientThreshold),
- Q3b = bpqueue:join(Q3, Q3a),
- case DeltaCount - bpqueue:len(Q3a) of
+ {Q3a, IndexState2} =
+ betas_from_segment_entries(
+ List, DeltaSeqIdEnd, TransientThreshold, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2 },
+ case bpqueue:len(Q3a) of
0 ->
- %% delta is now empty, but it wasn't before, so
- %% can now join q2 onto q3
- State1 #vqstate { delta = ?BLANK_DELTA,
- q2 = bpqueue:new(),
- q3 = bpqueue:join(Q3b, Q2) };
- N when N > 0 ->
- State2 = State1 #vqstate {
- q3 = Q3b,
- delta = #delta { start_seq_id = Delta1SeqId,
- count = N,
- end_seq_id = DeltaSeqIdEnd } },
- case N == DeltaCount of
- true -> maybe_deltas_to_betas(State2);
- false -> State2
+ maybe_deltas_to_betas(
+ State #vqstate {
+ delta = Delta #delta { start_seq_id = Delta1SeqId }});
+ _ ->
+ Q3b = bpqueue:join(Q3, Q3a),
+ case DeltaCount - bpqueue:len(Q3a) of
+ 0 ->
+ %% delta is now empty, but it wasn't
+ %% before, so can now join q2 onto q3
+ State1 #vqstate { delta = ?BLANK_DELTA,
+ q2 = bpqueue:new(),
+ q3 = bpqueue:join(Q3b, Q2) };
+ N when N > 0 ->
+ State1 #vqstate {
+ q3 = Q3b,
+ delta = #delta { start_seq_id = Delta1SeqId,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd } }
end
end
end.