diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-08 17:38:21 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-08 17:38:21 +0100 |
| commit | 0c9a7ab8868bfe47682c4ef05eb283b23ef852e5 (patch) | |
| tree | 330de6f75e957bc535e1bc5e8909ae221a7381e2 /src | |
| parent | 1ecb08930cfd4cfad92ac382fd8208c8f889a0d2 (diff) | |
| download | rabbitmq-server-git-0c9a7ab8868bfe47682c4ef05eb283b23ef852e5.tar.gz | |
The calculation of the highest seq id in the index queue was wrong - it was simply returning the highest unacked seq id, instead of looking for the highest seq id ever encountered. This could have led to reuse of seq ids. We also need to know the total message count in the queue index which is the number of unacked msgs recorded in the index, and we also need the seq id of the segment boundary of the segment containing the first msg in the queue. This is so that we can form the inital gamma correctly.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 97 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 11 |
2 files changed, 66 insertions, 42 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b21651a234..98ab2d7787 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -133,7 +133,8 @@ seg_ack_counts :: dict() }). --spec(init/1 :: (string()) -> {non_neg_integer(), qistate()}). +-spec(init/1 :: (string()) -> {non_neg_integer(), non_neg_integer(), + non_neg_integer(), qistate()}). -spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). @@ -154,10 +155,12 @@ init(Name) -> Dir = filename:join(queues_dir(), Name), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - AckCounts = scatter_journal(Dir, find_ack_counts(Dir)), + {AckCounts, TotalMsgCount} = scatter_journal(Dir, find_ack_counts(Dir)), {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME), [raw, binary, delayed_write, write, read]), - {find_next_seq_id(Dir), + {LowestSeqIdSeg, HighestSeqId} = + find_lowest_seq_id_seg_and_highest_seq_id(Dir), + {LowestSeqIdSeg, HighestSeqId + 1, TotalMsgCount, #qistate { dir = Dir, cur_seg_num = undefined, cur_seg_hdl = undefined, @@ -234,7 +237,7 @@ read_segment_entries(InitSeqId, State = #qistate { dir = Dir, journal_ack_dict = JAckDict }) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), SegPath = seg_num_to_path(Dir, SegNum), - {SDict, _AckCount} = load_segment(SegNum, SegPath, JAckDict), + {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, JAckDict), %% deliberately sort the list desc, because foldl will reverse it RelSeqs = rev_sort(dict:fetch_keys(SDict)), {lists:foldl(fun (RelSeq, Acc) -> @@ -308,39 +311,52 @@ all_segment_nums_paths(Dir) -> SegName)), filename:join(Dir, SegName)} || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. -find_next_seq_id(Dir) -> +find_lowest_seq_id_seg_and_highest_seq_id(Dir) -> SegNumsPaths = all_segment_nums_paths(Dir), - case rev_sort(SegNumsPaths) of - [] -> 0; - [{SegNum, SegPath}|_] -> - {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()), - case rev_sort(dict:fetch_keys(SDict)) of - [] -> 0; - [RelSeq|_] -> 1 + reconstruct_seq_id(SegNum, RelSeq) - end - end. + %% We don't want the lowest seq_id, merely the seq_id of the start + %% of the lowest segment. That seq_id may not actually exist, but + %% that's fine. The important thing is that the segment exists and + %% the seq_id reported is on a segment boundary. + LowSeqIdSeg = + case lists:sort(SegNumsPaths) of + [] -> 0; + [{SegNum1, _SegPath1}|_] -> reconstruct_seq_id(SegNum1, 0) + end, + HighestSeqId = + case rev_sort(SegNumsPaths) of + [] -> 0; + [{SegNum2, SegPath2}|_] -> + {_SDict, _AckCount, HighRelSeq} = + load_segment(SegNum2, SegPath2, dict:new()), + reconstruct_seq_id(SegNum2, HighRelSeq) + end, + {LowSeqIdSeg, HighestSeqId}. find_ack_counts(Dir) -> SegNumsPaths = all_segment_nums_paths(Dir), lists:foldl( - fun ({SegNum, SegPath}, Acc) -> - case load_segment(SegNum, SegPath, dict:new()) of - {_SDict, 0} -> Acc; - {_SDict, AckCount} -> dict:store(SegNum, AckCount, Acc) - end - end, dict:new(), SegNumsPaths). - -scatter_journal(Dir, AckCounts) -> + fun ({SegNum, SegPath}, {AccCount, AccDict}) -> + {SDict, AckCount, _HighRelSeq} = + load_segment(SegNum, SegPath, dict:new()), + {dict:size(SDict) + AccCount, + case AckCount of + 0 -> AccDict; + _ -> dict:store(SegNum, AckCount, AccDict) + end} + end, {0, dict:new()}, SegNumsPaths). + +scatter_journal(Dir, {TotalMsgCount, AckCounts}) -> JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME), case file:open(JournalPath, [read, read_ahead, raw, binary]) of {error, enoent} -> AckCounts; {ok, Hdl} -> ADict = load_journal(Hdl, dict:new()), ok = file:close(Hdl), - {AckCounts1, _Dir} = dict:fold(fun replay_journal_acks_to_segment/3, - {AckCounts, Dir}, ADict), + {AckCounts1, TotalMsgCount1, _Dir} = + dict:fold(fun replay_journal_acks_to_segment/3, + {AckCounts, TotalMsgCount, Dir}, ADict), ok = file:delete(JournalPath), - AckCounts1 + {AckCounts1, TotalMsgCount1} end. load_journal(Hdl, ADict) -> @@ -354,15 +370,15 @@ add_ack_to_ack_dict(SeqId, ADict) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). -replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, Dir}) -> +replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) -> SegPath = seg_num_to_path(Dir, SegNum), - {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()), + {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks)), {append_acks_to_segment(SegPath, SegNum, AckCounts, sets:to_list(ValidAcks)), - Dir}. + TotalMsgCount - sets:size(ValidAcks), Dir}. %%---------------------------------------------------------------------------- @@ -371,28 +387,30 @@ replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, Dir}) -> load_segment(SegNum, SegPath, JAckDict) -> case file:open(SegPath, [raw, binary, read_ahead, read]) of - {error, enoent} -> {dict:new(), 0}; + {error, enoent} -> {dict:new(), 0, 0}; {ok, Hdl} -> - {SDict, AckCount} = - load_segment_entries(SegNum, Hdl, {dict:new(), 0}), + {SDict, AckCount, HighRelSeq} = + load_segment_entries(SegNum, Hdl, {dict:new(), 0, 0}), ok = file:close(Hdl), RelSeqs = case dict:find(SegNum, JAckDict) of {ok, RelSeqs1} -> RelSeqs1; error -> [] end, - lists:foldl(fun (RelSeq, {SDict1, AckCount1}) -> - {dict:erase(RelSeq, SDict1), AckCount1+1} - end, {SDict, AckCount}, RelSeqs) + {SDict1, AckCount1} = + lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> + {dict:erase(RelSeq, SDict2), AckCount2 + 1} + end, {SDict, AckCount}, RelSeqs), + {SDict1, AckCount1, HighRelSeq} end. -load_segment_entries(SegNum, Hdl, {SDict, AckCount}) -> +load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) -> case file:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB/bitstring>>} -> {ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>, - load_segment_entries(SegNum, Hdl, - deliver_or_ack_msg(SDict, AckCount, RelSeq)); + {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), + load_segment_entries(SegNum, Hdl, {SDict1, AckCount1, HighRelSeq}); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB/bitstring>>} -> %% because we specify /binary, and binaries are complete @@ -400,11 +418,12 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount}) -> {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>, + HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), load_segment_entries( SegNum, Hdl, {dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, - SDict), AckCount}); - _ErrOrEoF -> {SDict, AckCount} + SDict), AckCount, HighRelSeq1}); + _ErrOrEoF -> {SDict, AckCount, HighRelSeq} end. deliver_or_ack_msg(SDict, AckCount, RelSeq) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 85dfbbac5b..1491879bc9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -81,7 +81,7 @@ %% in ram), and gamma is just a count of the number of index entries %% on disk at that stage (msg on disk, index on disk). %% -%% When a msg arrives, we decide which form it should be in. It is +%% When a msg arrives, we decide in which form it should be. It is %% then added to the rightmost appropriate queue, maintaining %% order. Thus if the msg is to be an alpha, it will be added to q1, %% unless all of q1, q2, gamma and q3 are empty, in which case it will @@ -103,9 +103,14 @@ %% is non empty then q3 must be non empty. init(QueueName) -> - {NextSeqId, IndexState} = rabbit_queue_index:init(QueueName), + {LowSeqId, NextSeqId, Count, IndexState} = + rabbit_queue_index:init(QueueName), + Gamma = case Count of + 0 -> #gamma { seq_id = undefined, count = 0 }; + _ -> #gamma { seq_id = LowSeqId, count = Count } + end, #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = #gamma { seq_id = undefined, count = 0 }, + gamma = Gamma, q3 = queue:new(), q4 = queue:new(), target_ram_msg_count = undefined, ram_msg_count = 0, |
