summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl97
-rw-r--r--src/rabbit_variable_queue.erl11
2 files changed, 66 insertions, 42 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b21651a234..98ab2d7787 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -133,7 +133,8 @@
seg_ack_counts :: dict()
}).
--spec(init/1 :: (string()) -> {non_neg_integer(), qistate()}).
+-spec(init/1 :: (string()) -> {non_neg_integer(), non_neg_integer(),
+ non_neg_integer(), qistate()}).
-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
@@ -154,10 +155,12 @@
init(Name) ->
Dir = filename:join(queues_dir(), Name),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- AckCounts = scatter_journal(Dir, find_ack_counts(Dir)),
+ {AckCounts, TotalMsgCount} = scatter_journal(Dir, find_ack_counts(Dir)),
{ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME),
[raw, binary, delayed_write, write, read]),
- {find_next_seq_id(Dir),
+ {LowestSeqIdSeg, HighestSeqId} =
+ find_lowest_seq_id_seg_and_highest_seq_id(Dir),
+ {LowestSeqIdSeg, HighestSeqId + 1, TotalMsgCount,
#qistate { dir = Dir,
cur_seg_num = undefined,
cur_seg_hdl = undefined,
@@ -234,7 +237,7 @@ read_segment_entries(InitSeqId, State =
#qistate { dir = Dir, journal_ack_dict = JAckDict }) ->
{SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
SegPath = seg_num_to_path(Dir, SegNum),
- {SDict, _AckCount} = load_segment(SegNum, SegPath, JAckDict),
+ {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, JAckDict),
%% deliberately sort the list desc, because foldl will reverse it
RelSeqs = rev_sort(dict:fetch_keys(SDict)),
{lists:foldl(fun (RelSeq, Acc) ->
@@ -308,39 +311,52 @@ all_segment_nums_paths(Dir) ->
SegName)), filename:join(Dir, SegName)}
|| SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
-find_next_seq_id(Dir) ->
+find_lowest_seq_id_seg_and_highest_seq_id(Dir) ->
SegNumsPaths = all_segment_nums_paths(Dir),
- case rev_sort(SegNumsPaths) of
- [] -> 0;
- [{SegNum, SegPath}|_] ->
- {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()),
- case rev_sort(dict:fetch_keys(SDict)) of
- [] -> 0;
- [RelSeq|_] -> 1 + reconstruct_seq_id(SegNum, RelSeq)
- end
- end.
+ %% We don't want the lowest seq_id, merely the seq_id of the start
+ %% of the lowest segment. That seq_id may not actually exist, but
+ %% that's fine. The important thing is that the segment exists and
+ %% the seq_id reported is on a segment boundary.
+ LowSeqIdSeg =
+ case lists:sort(SegNumsPaths) of
+ [] -> 0;
+ [{SegNum1, _SegPath1}|_] -> reconstruct_seq_id(SegNum1, 0)
+ end,
+ HighestSeqId =
+ case rev_sort(SegNumsPaths) of
+ [] -> 0;
+ [{SegNum2, SegPath2}|_] ->
+ {_SDict, _AckCount, HighRelSeq} =
+ load_segment(SegNum2, SegPath2, dict:new()),
+ reconstruct_seq_id(SegNum2, HighRelSeq)
+ end,
+ {LowSeqIdSeg, HighestSeqId}.
find_ack_counts(Dir) ->
SegNumsPaths = all_segment_nums_paths(Dir),
lists:foldl(
- fun ({SegNum, SegPath}, Acc) ->
- case load_segment(SegNum, SegPath, dict:new()) of
- {_SDict, 0} -> Acc;
- {_SDict, AckCount} -> dict:store(SegNum, AckCount, Acc)
- end
- end, dict:new(), SegNumsPaths).
-
-scatter_journal(Dir, AckCounts) ->
+ fun ({SegNum, SegPath}, {AccCount, AccDict}) ->
+ {SDict, AckCount, _HighRelSeq} =
+ load_segment(SegNum, SegPath, dict:new()),
+ {dict:size(SDict) + AccCount,
+ case AckCount of
+ 0 -> AccDict;
+ _ -> dict:store(SegNum, AckCount, AccDict)
+ end}
+ end, {0, dict:new()}, SegNumsPaths).
+
+scatter_journal(Dir, {TotalMsgCount, AckCounts}) ->
JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
case file:open(JournalPath, [read, read_ahead, raw, binary]) of
{error, enoent} -> AckCounts;
{ok, Hdl} ->
ADict = load_journal(Hdl, dict:new()),
ok = file:close(Hdl),
- {AckCounts1, _Dir} = dict:fold(fun replay_journal_acks_to_segment/3,
- {AckCounts, Dir}, ADict),
+ {AckCounts1, TotalMsgCount1, _Dir} =
+ dict:fold(fun replay_journal_acks_to_segment/3,
+ {AckCounts, TotalMsgCount, Dir}, ADict),
ok = file:delete(JournalPath),
- AckCounts1
+ {AckCounts1, TotalMsgCount1}
end.
load_journal(Hdl, ADict) ->
@@ -354,15 +370,15 @@ add_ack_to_ack_dict(SeqId, ADict) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
-replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, Dir}) ->
+replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) ->
SegPath = seg_num_to_path(Dir, SegNum),
- {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()),
+ {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()),
ValidRelSeqIds = dict:fetch_keys(SDict),
ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds),
sets:from_list(Acks)),
{append_acks_to_segment(SegPath, SegNum, AckCounts,
sets:to_list(ValidAcks)),
- Dir}.
+ TotalMsgCount - sets:size(ValidAcks), Dir}.
%%----------------------------------------------------------------------------
@@ -371,28 +387,30 @@ replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, Dir}) ->
load_segment(SegNum, SegPath, JAckDict) ->
case file:open(SegPath, [raw, binary, read_ahead, read]) of
- {error, enoent} -> {dict:new(), 0};
+ {error, enoent} -> {dict:new(), 0, 0};
{ok, Hdl} ->
- {SDict, AckCount} =
- load_segment_entries(SegNum, Hdl, {dict:new(), 0}),
+ {SDict, AckCount, HighRelSeq} =
+ load_segment_entries(SegNum, Hdl, {dict:new(), 0, 0}),
ok = file:close(Hdl),
RelSeqs = case dict:find(SegNum, JAckDict) of
{ok, RelSeqs1} -> RelSeqs1;
error -> []
end,
- lists:foldl(fun (RelSeq, {SDict1, AckCount1}) ->
- {dict:erase(RelSeq, SDict1), AckCount1+1}
- end, {SDict, AckCount}, RelSeqs)
+ {SDict1, AckCount1} =
+ lists:foldl(fun (RelSeq, {SDict2, AckCount2}) ->
+ {dict:erase(RelSeq, SDict2), AckCount2 + 1}
+ end, {SDict, AckCount}, RelSeqs),
+ {SDict1, AckCount1, HighRelSeq}
end.
-load_segment_entries(SegNum, Hdl, {SDict, AckCount}) ->
+load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) ->
case file:read(Hdl, 1) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB/bitstring>>} ->
{ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>,
- load_segment_entries(SegNum, Hdl,
- deliver_or_ack_msg(SDict, AckCount, RelSeq));
+ {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
+ load_segment_entries(SegNum, Hdl, {SDict1, AckCount1, HighRelSeq});
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, MSB/bitstring>>} ->
%% because we specify /binary, and binaries are complete
@@ -400,11 +418,12 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount}) ->
{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} =
file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>,
+ HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
load_segment_entries(
SegNum, Hdl, {dict:store(RelSeq, {MsgId, false,
1 == IsPersistentNum},
- SDict), AckCount});
- _ErrOrEoF -> {SDict, AckCount}
+ SDict), AckCount, HighRelSeq1});
+ _ErrOrEoF -> {SDict, AckCount, HighRelSeq}
end.
deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 85dfbbac5b..1491879bc9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -81,7 +81,7 @@
%% in ram), and gamma is just a count of the number of index entries
%% on disk at that stage (msg on disk, index on disk).
%%
-%% When a msg arrives, we decide which form it should be in. It is
+%% When a msg arrives, we decide in which form it should be. It is
%% then added to the rightmost appropriate queue, maintaining
%% order. Thus if the msg is to be an alpha, it will be added to q1,
%% unless all of q1, q2, gamma and q3 are empty, in which case it will
@@ -103,9 +103,14 @@
%% is non empty then q3 must be non empty.
init(QueueName) ->
- {NextSeqId, IndexState} = rabbit_queue_index:init(QueueName),
+ {LowSeqId, NextSeqId, Count, IndexState} =
+ rabbit_queue_index:init(QueueName),
+ Gamma = case Count of
+ 0 -> #gamma { seq_id = undefined, count = 0 };
+ _ -> #gamma { seq_id = LowSeqId, count = Count }
+ end,
#vqstate { q1 = queue:new(), q2 = queue:new(),
- gamma = #gamma { seq_id = undefined, count = 0 },
+ gamma = Gamma,
q3 = queue:new(), q4 = queue:new(),
target_ram_msg_count = undefined,
ram_msg_count = 0,