summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-18 12:53:54 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-18 12:53:54 +0100
commitafdeb07ef3abf056976078ea3b68a3cb6a94f266 (patch)
treea0d59ce73ac8002d5399bc830972b4c67840b346 /src
parentc75f81a640cff9ecb4a6e4e5c25b207c9399d94d (diff)
downloadrabbitmq-server-git-afdeb07ef3abf056976078ea3b68a3cb6a94f266.tar.gz
Reworked reading from the queue to still be limited to a maximum of one segment, but with a more natural start+end interface
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl122
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl83
3 files changed, 120 insertions, 91 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a6753910b1..7cf36193f6 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,8 +32,8 @@
-module(rabbit_queue_index).
-export([init/3, terminate/2, terminate_and_erase/1, publish/4,
- deliver/2, ack/2, sync/2, flush/1, read_segment_entries/2,
- next_segment_boundary/1, current_segment_boundary/1, bounds/1,
+ deliver/2, ack/2, sync/2, flush/1, read/3,
+ current_segment_boundary/1, next_segment_boundary/1, bounds/1,
recover/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -83,15 +83,15 @@
%% journal is still held in this mapping). Actions are stored directly
%% in this state. Thus at the point of flushing the journal, firstly
%% no reading from disk is necessary, but secondly if the known number
-%% of acks and publishes in a segment qare equal, given the known
-%% state of the segment file combined with the journal, no writing
-%% needs to be done to the segment file either (in fact it is deleted
-%% if it exists at all). This is safe given that the set of acks is a
-%% subset of the set of publishes. When it's necessary to sync
-%% messages because of transactions, it's only necessary to fsync on
-%% the journal: when entries are distributed from the journal to
-%% segment files, those segments appended to are fsync'd prior to the
-%% journal being truncated.
+%% of acks and publishes in a segment are equal, given the known state
+%% of the segment file combined with the journal, no writing needs to
+%% be done to the segment file either (in fact it is deleted if it
+%% exists at all). This is safe given that the set of acks is a subset
+%% of the set of publishes. When it's necessary to sync messages
+%% because of transactions, it's only necessary to fsync on the
+%% journal: when entries are distributed from the journal to segment
+%% files, those segments appended to are fsync'd prior to the journal
+%% being truncated.
%%
%% This module is also responsible for scanning the queue index files
%% and seeding the message store on start up.
@@ -191,10 +191,11 @@
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
--spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
- {[{guid(), seq_id(), boolean(), boolean()}], qistate()}).
--spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
+-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
+ {[{guid(), seq_id(), boolean(), boolean()}],
+ seq_id() | 'undefined', qistate()}).
-spec(current_segment_boundary/1 :: (seq_id()) -> seq_id()).
+-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(recover/1 :: ([queue_name()]) -> {[[any()]], startup_fun_state()}).
@@ -332,33 +333,50 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
flush(State) -> flush_journal(State).
-read_segment_entries(InitSeqId, State = #qistate { segments = Segments,
- dir = Dir }) ->
- {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- Segment = segment_find_or_new(Seg, Dir, Segments),
- {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment),
+read(StartEnd, StartEnd, State) ->
+ {[], undefined, State};
+read(Start, End, State = #qistate { segments = Segments,
+ dir = Dir }) when Start =< End ->
+ %% Start is inclusive, End is exclusive.
+ {StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
+ {EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End),
+ Start1 = reconstruct_seq_id(StartSeg + 1, 0),
+ Again = case End =< Start1 of
+ true -> undefined;
+ false -> Start1
+ end,
+ MaxRelSeq = case StartSeg =:= EndSeg of
+ true -> EndRelSeq;
+ false -> ?SEGMENT_ENTRY_COUNT
+ end,
+ Segment = segment_find_or_new(StartSeg, Dir, Segments),
+ {SegEntries, _PubCount, _AckCount, Segment1} =
+ load_segment(false, StartRelSeq, MaxRelSeq, Segment),
#segment { journal_entries = JEntries } = Segment1,
{array:sparse_foldr(
fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) ->
- [ {Guid, reconstruct_seq_id(Seg, RelSeq),
+ [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
IsPersistent, IsDelivered == del} | Acc ]
- end, [], journal_plus_segment(JEntries, SegEntries)),
+ end, [],
+ journal_plus_segment(JEntries, SegEntries, StartRelSeq, MaxRelSeq)),
+ Again,
State #qistate { segments = segment_store(Segment1, Segments) }}.
-next_segment_boundary(SeqId) ->
- {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- reconstruct_seq_id(Seg + 1, 0).
-
current_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
reconstruct_seq_id(Seg, 0).
+next_segment_boundary(SeqId) ->
+ {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ reconstruct_seq_id(Seg + 1, 0).
+
bounds(State) ->
SegNums = all_segment_nums(State),
- %% We don't want the lowest seq_id, merely the seq_id of the start
- %% of the lowest segment. That seq_id may not actually exist, but
- %% that's fine. The important thing is that the segment exists and
- %% the seq_id reported is on a segment boundary.
+ %% Don't bother trying to figure out the lowest seq_id, merely the
+ %% seq_id of the start of the lowest segment. That seq_id may not
+ %% actually exist, but that's fine. The important thing is that
+ %% the segment exists and the seq_id reported is on a segment
+ %% boundary.
%% We also don't really care about the max seq_id. Just start the
%% next segment: it makes life much easier.
@@ -460,7 +478,7 @@ terminate(StoreShutdown, Terms, State =
recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
{SegEntries, PubCount, AckCount, Segment1} =
- load_segment(false, Segment),
+ load_segment(false, 0, ?SEGMENT_ENTRY_COUNT, Segment),
array:sparse_foldl(
fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
@@ -518,7 +536,8 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State1 = lists:foldl(
fun (Seg, State2) ->
SeqId = reconstruct_seq_id(Seg, 0),
- {Messages, State3} = read_segment_entries(SeqId, State2),
+ {Messages, undefined, State3} =
+ read(SeqId, next_segment_boundary(SeqId), State2),
[ok = gatherer:in(Gatherer, {Guid, 1}) ||
{Guid, _SeqId, true, _IsDelivered} <- Messages],
State3
@@ -633,7 +652,7 @@ load_journal(State) ->
%% them if duplicates are in the journal. The counts
%% here are purely from the segment itself.
{SegEntries, PubCountInSeg, AckCountInSeg, Segment1} =
- load_segment(true, Segment),
+ load_segment(true, 0, ?SEGMENT_ENTRY_COUNT, Segment),
%% Removed counts here are the number of pubs and
%% acks that are duplicates - i.e. found in both the
%% segment and journal.
@@ -806,8 +825,9 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
%% false, then array:sparse_size(SegEntries) == PubCount -
%% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries)
-%% == PubCount.
-load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) ->
+%% == PubCount. StartRelSeq is inclusive, EndRelSeq is exclusive.
+load_segment(KeepAcks, StartRelSeq, EndRelSeq,
+ Segment = #segment { path = Path, handle = SegHdl }) ->
SegmentExists = case SegHdl of
undefined -> filelib:is_file(Path);
_ -> true
@@ -817,20 +837,24 @@ load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) ->
true -> {Hdl, Segment1} = get_segment_handle(Segment),
{ok, 0} = file_handle_cache:position(Hdl, bof),
{SegEntries, PubCount, AckCount} =
- load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0),
+ load_segment_entries(KeepAcks, StartRelSeq, EndRelSeq, Hdl,
+ array_new(), 0, 0),
{SegEntries, PubCount, AckCount, Segment1}
end.
-load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
+load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, PubCount,
+ AckCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
+ RelSeq:?REL_SEQ_BITS>>}
+ when StartRel =< RelSeq andalso RelSeq < EndRel ->
{AckCount1, SegEntries1} =
deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries),
- load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount,
- AckCount1);
+ load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1,
+ PubCount, AckCount1);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>}
+ when StartRel =< RelSeq andalso RelSeq < EndRel ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
{ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
@@ -838,8 +862,11 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
array:set(RelSeq,
{{Guid, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries),
- load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1,
- AckCount);
+ load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1,
+ PubCount + 1, AckCount);
+ {ok, _SomeBinary} ->
+ load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries,
+ PubCount, AckCount);
_ErrOrEoF ->
{SegEntries, PubCount, AckCount}
end.
@@ -867,15 +894,18 @@ bool_to_int(false) -> 0.
%% Combine what we have just read from a segment file with what we're
%% holding for that segment in memory. There must be no
%% duplicates. Used when providing segment entries to the variable
-%% queue.
-journal_plus_segment(JEntries, SegEntries) ->
+%% queue. RelStart is inclusive, RelEnd is exclusive.
+journal_plus_segment(JEntries, SegEntries, RelStart, RelEnd) ->
array:sparse_foldl(
- fun (RelSeq, JObj, SegEntriesOut) ->
+ fun (RelSeq, JObj, SegEntriesOut)
+ when RelStart =< RelSeq andalso RelSeq < RelEnd ->
SegEntry = array:get(RelSeq, SegEntriesOut),
case journal_plus_segment1(JObj, SegEntry) of
undefined -> array:reset(RelSeq, SegEntriesOut);
Obj -> array:set(RelSeq, Obj, SegEntriesOut)
- end
+ end;
+ (_RelSeq, _JObj, SegEntriesOut) ->
+ SegEntriesOut
end, SegEntries, JEntries).
%% Here, the result is the item which we may be adding to (for items
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 358f857b12..9821367d59 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1458,7 +1458,7 @@ test_queue_index() ->
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
- {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3),
+ {ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
%% call terminate twice to prove it's idempotent
@@ -1470,7 +1470,7 @@ test_queue_index() ->
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
- {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9),
+ {ReadB, undefined, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
@@ -1481,7 +1481,7 @@ test_queue_index() ->
{LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
- {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14),
+ {ReadC, undefined, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsGuidsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 856b1f0c36..992cf19a72 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -175,9 +175,9 @@
}).
-record(delta,
- { start_seq_id,
+ { start_seq_id, %% start_seq_id is inclusive
count,
- end_seq_id %% note the end_seq_id is always >, not >=
+ end_seq_id %% end_seq_id is exclusive
}).
-record(tx, { pending_messages, pending_acks }).
@@ -797,7 +797,7 @@ persistent_guids(Pubs) ->
[Guid || Obj = #basic_message { guid = Guid } <- Pubs,
Obj #basic_message.is_persistent].
-betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
+betas_from_segment_entries(List, TransientThreshold, IndexState) ->
{Filtered, IndexState1} =
lists:foldr(
fun ({Guid, SeqId, IsPersistent, IsDelivered},
@@ -811,28 +811,27 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
end,
{FilteredAcc, rabbit_queue_index:ack(
[SeqId], IndexStateAcc1)};
- false -> case SeqId < SeqIdLimit of
- true -> {[#msg_status {
- msg = undefined,
- guid = Guid,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true
- } | FilteredAcc],
- IndexStateAcc};
- false -> {FilteredAcc, IndexStateAcc}
- end
+ false -> {[#msg_status { msg = undefined,
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ } | FilteredAcc],
+ IndexStateAcc}
end
end, {[], IndexState}, List),
{bpqueue:from_list([{true, Filtered}]), IndexState1}.
-read_index_segment(SeqId, IndexState) ->
- SeqId1 = rabbit_queue_index:next_segment_boundary(SeqId),
- case rabbit_queue_index:read_segment_entries(SeqId, IndexState) of
- {[], IndexState1} -> read_index_segment(SeqId1, IndexState1);
- {List, IndexState1} -> {List, IndexState1, SeqId1}
+read_one_index_segment(StartSeqId, EndSeqId, IndexState)
+ when StartSeqId =< EndSeqId ->
+ case rabbit_queue_index:read(StartSeqId, EndSeqId, IndexState) of
+ {List, Again, IndexState1} when List /= [] orelse Again =:= undefined ->
+ {List, IndexState1,
+ rabbit_queue_index:next_segment_boundary(StartSeqId)};
+ {[], StartSeqId1, IndexState1} ->
+ read_one_index_segment(StartSeqId1, EndSeqId, IndexState1)
end.
ensure_binary_properties(Msg = #basic_message { content = Content }) ->
@@ -959,26 +958,27 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId,
- IndexState) when DeltaSeqId >= NextSeqId ->
+ IndexState) when DeltaSeqId =:= undefined
+ orelse DeltaSeqId >= NextSeqId ->
{Count, IndexState};
delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId,
IndexState) ->
- Delta1SeqId = rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of
- {[], IndexState1} ->
- delete1(PersistentStore, TransientThreshold, NextSeqId, Count,
- Delta1SeqId, IndexState1);
- {List, IndexState1} ->
- {Q, IndexState2} =
- betas_from_segment_entries(
- List, Delta1SeqId, TransientThreshold, IndexState1),
- {QCount, IndexState3} =
- remove_queue_entries(
- PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q, IndexState2),
- delete1(PersistentStore, TransientThreshold, NextSeqId,
- Count + QCount, Delta1SeqId, IndexState3)
- end.
+ {List, Again, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState),
+ {IndexState2, Count1} =
+ case List of
+ [] -> {IndexState1, Count};
+ _ -> {Q, IndexState3} =
+ betas_from_segment_entries(
+ List, TransientThreshold, IndexState1),
+ {Count2, IndexState4} =
+ remove_queue_entries(
+ PersistentStore, fun beta_fold_no_index_on_disk/3,
+ Q, IndexState3),
+ {IndexState4, Count2 + Count}
+ end,
+ delete1(PersistentStore, TransientThreshold, NextSeqId, Count1, Again,
+ IndexState2).
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
persistent_store = PersistentStore }) ->
@@ -1370,13 +1370,12 @@ maybe_deltas_to_betas(
%% segment, or TargetRamMsgCount > 0, meaning we should
%% really be holding all the betas in memory.
{List, IndexState1, Delta1SeqId} =
- read_index_segment(DeltaSeqId, IndexState),
+ read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState),
%% length(List) may be < segment_size because of acks. It
%% could be [] if we ignored every message in the segment
%% due to it being transient and below the threshold
- {Q3a, IndexState2} =
- betas_from_segment_entries(
- List, DeltaSeqIdEnd, TransientThreshold, IndexState1),
+ {Q3a, IndexState2} = betas_from_segment_entries(
+ List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case bpqueue:len(Q3a) of
0 ->