diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 48 |
1 files changed, 33 insertions, 15 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 34bb9920ec..e4111f82fe 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -132,7 +132,7 @@ seg_ack_counts :: dict() }). --spec(init/1 :: (string()) -> qistate()). +-spec(init/1 :: (string()) -> {non_neg_integer(), qistate()}). -spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). @@ -154,14 +154,15 @@ init(Name) -> AckCounts = scatter_journal(Dir, find_ack_counts(Dir)), {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME), [raw, binary, delayed_write, write, read]), - #qistate { dir = Dir, - cur_seg_num = undefined, - cur_seg_hdl = undefined, - journal_ack_count = 0, - journal_ack_dict = dict:new(), - journal_handle = JournalHdl, - seg_ack_counts = AckCounts - }. + {find_next_seq_id(Dir), + #qistate { dir = Dir, + cur_seg_num = undefined, + cur_seg_hdl = undefined, + journal_ack_count = 0, + journal_ack_dict = dict:new(), + journal_handle = JournalHdl, + seg_ack_counts = AckCounts + }}. write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> @@ -232,7 +233,7 @@ read_segment_entries(InitSeqId, State = SegPath = seg_num_to_path(Dir, SegNum), {SDict, _AckCount} = load_segment(SegNum, SegPath, JAckDict), %% deliberately sort the list desc, because foldl will reverse it - RelSeqs = lists:sort(fun (A, B) -> B < A end, dict:fetch_keys(SDict)), + RelSeqs = rev_sort(dict:fetch_keys(SDict)), {lists:foldl(fun (RelSeq, Acc) -> {MsgId, IsDelivered, IsPersistent} = dict:fetch(RelSeq, SDict), @@ -245,6 +246,9 @@ read_segment_entries(InitSeqId, State = %% Minor Helpers %%---------------------------------------------------------------------------- +rev_sort(List) -> + lists:sort(fun (A, B) -> B < A end, List). + close_file_handle_for_seg(_SegNum, State = #qistate { cur_seg_num = undefined }) -> State; @@ -285,12 +289,26 @@ seg_num_to_path(Dir, SegNum) -> %% Startup Functions %%---------------------------------------------------------------------------- +all_segment_nums_paths(Dir) -> + [{list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, + SegName)), filename:join(Dir, SegName)} + || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. + +find_next_seq_id(Dir) -> + SegNumsPaths = all_segment_nums_paths(Dir), + case rev_sort(SegNumsPaths) of + [] -> 0; + [{SegNum, SegPath}|_] -> + {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()), + case rev_sort(dict:fetch_keys(SDict)) of + [] -> 0; + [RelSeq|_] -> 1 + reconstruct_seq_id(SegNum, RelSeq) + end + end. + find_ack_counts(Dir) -> - SegNumsPaths = - [{list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, - SegName)), filename:join(Dir, SegName)} - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + SegNumsPaths = all_segment_nums_paths(Dir), lists:foldl( fun ({SegNum, SegPath}, Acc) -> case load_segment(SegNum, SegPath, dict:new()) of |
