diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 97 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 11 |
2 files changed, 66 insertions, 42 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b21651a234..98ab2d7787 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -133,7 +133,8 @@ seg_ack_counts :: dict() }). --spec(init/1 :: (string()) -> {non_neg_integer(), qistate()}). +-spec(init/1 :: (string()) -> {non_neg_integer(), non_neg_integer(), + non_neg_integer(), qistate()}). -spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). @@ -154,10 +155,12 @@ init(Name) -> Dir = filename:join(queues_dir(), Name), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - AckCounts = scatter_journal(Dir, find_ack_counts(Dir)), + {AckCounts, TotalMsgCount} = scatter_journal(Dir, find_ack_counts(Dir)), {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME), [raw, binary, delayed_write, write, read]), - {find_next_seq_id(Dir), + {LowestSeqIdSeg, HighestSeqId} = + find_lowest_seq_id_seg_and_highest_seq_id(Dir), + {LowestSeqIdSeg, HighestSeqId + 1, TotalMsgCount, #qistate { dir = Dir, cur_seg_num = undefined, cur_seg_hdl = undefined, @@ -234,7 +237,7 @@ read_segment_entries(InitSeqId, State = #qistate { dir = Dir, journal_ack_dict = JAckDict }) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), SegPath = seg_num_to_path(Dir, SegNum), - {SDict, _AckCount} = load_segment(SegNum, SegPath, JAckDict), + {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, JAckDict), %% deliberately sort the list desc, because foldl will reverse it RelSeqs = rev_sort(dict:fetch_keys(SDict)), {lists:foldl(fun (RelSeq, Acc) -> @@ -308,39 +311,52 @@ all_segment_nums_paths(Dir) -> SegName)), filename:join(Dir, SegName)} || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. -find_next_seq_id(Dir) -> +find_lowest_seq_id_seg_and_highest_seq_id(Dir) -> SegNumsPaths = all_segment_nums_paths(Dir), - case rev_sort(SegNumsPaths) of - [] -> 0; - [{SegNum, SegPath}|_] -> - {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()), - case rev_sort(dict:fetch_keys(SDict)) of - [] -> 0; - [RelSeq|_] -> 1 + reconstruct_seq_id(SegNum, RelSeq) - end - end. + %% We don't want the lowest seq_id, merely the seq_id of the start + %% of the lowest segment. That seq_id may not actually exist, but + %% that's fine. The important thing is that the segment exists and + %% the seq_id reported is on a segment boundary. + LowSeqIdSeg = + case lists:sort(SegNumsPaths) of + [] -> 0; + [{SegNum1, _SegPath1}|_] -> reconstruct_seq_id(SegNum1, 0) + end, + HighestSeqId = + case rev_sort(SegNumsPaths) of + [] -> 0; + [{SegNum2, SegPath2}|_] -> + {_SDict, _AckCount, HighRelSeq} = + load_segment(SegNum2, SegPath2, dict:new()), + reconstruct_seq_id(SegNum2, HighRelSeq) + end, + {LowSeqIdSeg, HighestSeqId}. find_ack_counts(Dir) -> SegNumsPaths = all_segment_nums_paths(Dir), lists:foldl( - fun ({SegNum, SegPath}, Acc) -> - case load_segment(SegNum, SegPath, dict:new()) of - {_SDict, 0} -> Acc; - {_SDict, AckCount} -> dict:store(SegNum, AckCount, Acc) - end - end, dict:new(), SegNumsPaths). - -scatter_journal(Dir, AckCounts) -> + fun ({SegNum, SegPath}, {AccCount, AccDict}) -> + {SDict, AckCount, _HighRelSeq} = + load_segment(SegNum, SegPath, dict:new()), + {dict:size(SDict) + AccCount, + case AckCount of + 0 -> AccDict; + _ -> dict:store(SegNum, AckCount, AccDict) + end} + end, {0, dict:new()}, SegNumsPaths). + +scatter_journal(Dir, {TotalMsgCount, AckCounts}) -> JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME), case file:open(JournalPath, [read, read_ahead, raw, binary]) of {error, enoent} -> AckCounts; {ok, Hdl} -> ADict = load_journal(Hdl, dict:new()), ok = file:close(Hdl), - {AckCounts1, _Dir} = dict:fold(fun replay_journal_acks_to_segment/3, - {AckCounts, Dir}, ADict), + {AckCounts1, TotalMsgCount1, _Dir} = + dict:fold(fun replay_journal_acks_to_segment/3, + {AckCounts, TotalMsgCount, Dir}, ADict), ok = file:delete(JournalPath), - AckCounts1 + {AckCounts1, TotalMsgCount1} end. load_journal(Hdl, ADict) -> @@ -354,15 +370,15 @@ add_ack_to_ack_dict(SeqId, ADict) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). -replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, Dir}) -> +replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) -> SegPath = seg_num_to_path(Dir, SegNum), - {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()), + {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks)), {append_acks_to_segment(SegPath, SegNum, AckCounts, sets:to_list(ValidAcks)), - Dir}. + TotalMsgCount - sets:size(ValidAcks), Dir}. %%---------------------------------------------------------------------------- @@ -371,28 +387,30 @@ replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, Dir}) -> load_segment(SegNum, SegPath, JAckDict) -> case file:open(SegPath, [raw, binary, read_ahead, read]) of - {error, enoent} -> {dict:new(), 0}; + {error, enoent} -> {dict:new(), 0, 0}; {ok, Hdl} -> - {SDict, AckCount} = - load_segment_entries(SegNum, Hdl, {dict:new(), 0}), + {SDict, AckCount, HighRelSeq} = + load_segment_entries(SegNum, Hdl, {dict:new(), 0, 0}), ok = file:close(Hdl), RelSeqs = case dict:find(SegNum, JAckDict) of {ok, RelSeqs1} -> RelSeqs1; error -> [] end, - lists:foldl(fun (RelSeq, {SDict1, AckCount1}) -> - {dict:erase(RelSeq, SDict1), AckCount1+1} - end, {SDict, AckCount}, RelSeqs) + {SDict1, AckCount1} = + lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> + {dict:erase(RelSeq, SDict2), AckCount2 + 1} + end, {SDict, AckCount}, RelSeqs), + {SDict1, AckCount1, HighRelSeq} end. -load_segment_entries(SegNum, Hdl, {SDict, AckCount}) -> +load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) -> case file:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB/bitstring>>} -> {ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>, - load_segment_entries(SegNum, Hdl, - deliver_or_ack_msg(SDict, AckCount, RelSeq)); + {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), + load_segment_entries(SegNum, Hdl, {SDict1, AckCount1, HighRelSeq}); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB/bitstring>>} -> %% because we specify /binary, and binaries are complete @@ -400,11 +418,12 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount}) -> {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>, + HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), load_segment_entries( SegNum, Hdl, {dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, - SDict), AckCount}); - _ErrOrEoF -> {SDict, AckCount} + SDict), AckCount, HighRelSeq1}); + _ErrOrEoF -> {SDict, AckCount, HighRelSeq} end. deliver_or_ack_msg(SDict, AckCount, RelSeq) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 85dfbbac5b..1491879bc9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -81,7 +81,7 @@ %% in ram), and gamma is just a count of the number of index entries %% on disk at that stage (msg on disk, index on disk). %% -%% When a msg arrives, we decide which form it should be in. It is +%% When a msg arrives, we decide in which form it should be. It is %% then added to the rightmost appropriate queue, maintaining %% order. Thus if the msg is to be an alpha, it will be added to q1, %% unless all of q1, q2, gamma and q3 are empty, in which case it will @@ -103,9 +103,14 @@ %% is non empty then q3 must be non empty. init(QueueName) -> - {NextSeqId, IndexState} = rabbit_queue_index:init(QueueName), + {LowSeqId, NextSeqId, Count, IndexState} = + rabbit_queue_index:init(QueueName), + Gamma = case Count of + 0 -> #gamma { seq_id = undefined, count = 0 }; + _ -> #gamma { seq_id = LowSeqId, count = Count } + end, #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = #gamma { seq_id = undefined, count = 0 }, + gamma = Gamma, q3 = queue:new(), q4 = queue:new(), target_ram_msg_count = undefined, ram_msg_count = 0, |
