diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-26 16:28:53 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-26 16:28:53 +0000 |
| commit | ba4d3e0e1aadcfe367b55717bce2f3ec370f44cc (patch) | |
| tree | a04eb50507a0c1ec52fc7bd4417026590c6853ed /src | |
| parent | a2191649b05773c76661ffe41a7d85e4e1c9b4e3 (diff) | |
| download | rabbitmq-server-git-ba4d3e0e1aadcfe367b55717bce2f3ec370f44cc.tar.gz | |
rewrote the qi populating the msg_store (delta function), sidestepping queue scattering. Seems to work fine.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index dd62a7ed8f..0870acc596 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -156,21 +156,14 @@ %%---------------------------------------------------------------------------- init(Name) -> - StrName = queue_name_to_dir_name(Name), - Dir = filename:join(queues_dir(), StrName), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - State = #qistate { dir = Dir, - seg_num_handles = dict:new(), - journal_ack_count = 0, - journal_ack_dict = dict:new(), - seg_ack_counts = dict:new() }, + State = blank_state(Name), {TotalMsgCount, State1} = find_ack_counts_and_deliver_transient_msgs(State), scatter_journal(TotalMsgCount, State1). terminate(State = #qistate { seg_num_handles = SegHdls }) -> case 0 == dict:size(SegHdls) of true -> State; - false -> close_all_handles(full_flush_journal(State)) + false -> close_all_handles(State) end. terminate_and_erase(State) -> @@ -434,6 +427,16 @@ all_segment_nums(Dir) -> lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. +blank_state(QueueName) -> + StrName = queue_name_to_dir_name(QueueName), + Dir = filename:join(queues_dir(), StrName), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + #qistate { dir = Dir, + seg_num_handles = dict:new(), + journal_ack_count = 0, + journal_ack_dict = dict:new(), + seg_ack_counts = dict:new() }. + %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function @@ -442,25 +445,27 @@ all_segment_nums(Dir) -> queue_index_walker([]) -> finished; queue_index_walker([QueueName|QueueNames]) -> - {TotalMsgCount, State} = init(QueueName), - {LowSeqIdSeg, _NextSeqId, State1} = - find_lowest_seq_id_seg_and_next_seq_id(State), - queue_index_walker({TotalMsgCount, LowSeqIdSeg, State1, QueueNames}); + State = blank_state(QueueName), + {Hdl, State1} = get_journal_handle(State), + JAckDict = load_journal(Hdl, dict:new()), + State2 = #qistate { dir = Dir } = + close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }), + SegNums = all_segment_nums(Dir), + queue_index_walker({SegNums, State2, QueueNames}); -queue_index_walker({0, _LowSeqIdSeg, State, QueueNames}) -> - terminate(State), +queue_index_walker({[], State, QueueNames}) -> + _State = terminate(State), queue_index_walker(QueueNames); -queue_index_walker({N, LowSeqIdSeg, State, QueueNames}) -> - {Entries, State1} = read_segment_entries(LowSeqIdSeg, State), - LowSeqIdSeg1 = LowSeqIdSeg + segment_size(), - queue_index_walker({Entries, N, LowSeqIdSeg1, State1, QueueNames}); - -queue_index_walker({[], N, LowSeqIdSeg, State, QueueNames}) -> - queue_index_walker({N, LowSeqIdSeg, State, QueueNames}); -queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries], - N, LowSeqIdSeg, State, QueueNames}) -> +queue_index_walker({[SegNum | SegNums], State, QueueNames}) -> + {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), + queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames}); + +queue_index_walker({[], State, SegNums, QueueNames}) -> + queue_index_walker({SegNums, State, QueueNames}); +queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs], + State, SegNums, QueueNames}) -> {MsgId, bool_to_int(IsPersistent), - {Entries, N - 1, LowSeqIdSeg, State, QueueNames}}. + {Msgs, State, SegNums, QueueNames}}. %%---------------------------------------------------------------------------- |
