summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-26 16:28:53 +0000
committerMatthew Sackman <matthew@lshift.net>2009-10-26 16:28:53 +0000
commitba4d3e0e1aadcfe367b55717bce2f3ec370f44cc (patch)
treea04eb50507a0c1ec52fc7bd4417026590c6853ed /src
parenta2191649b05773c76661ffe41a7d85e4e1c9b4e3 (diff)
downloadrabbitmq-server-git-ba4d3e0e1aadcfe367b55717bce2f3ec370f44cc.tar.gz
rewrote the qi populating the msg_store (delta function), sidestepping queue scattering. Seems to work fine.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl55
1 files changed, 30 insertions, 25 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index dd62a7ed8f..0870acc596 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -156,21 +156,14 @@
%%----------------------------------------------------------------------------
init(Name) ->
- StrName = queue_name_to_dir_name(Name),
- Dir = filename:join(queues_dir(), StrName),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- State = #qistate { dir = Dir,
- seg_num_handles = dict:new(),
- journal_ack_count = 0,
- journal_ack_dict = dict:new(),
- seg_ack_counts = dict:new() },
+ State = blank_state(Name),
{TotalMsgCount, State1} = find_ack_counts_and_deliver_transient_msgs(State),
scatter_journal(TotalMsgCount, State1).
terminate(State = #qistate { seg_num_handles = SegHdls }) ->
case 0 == dict:size(SegHdls) of
true -> State;
- false -> close_all_handles(full_flush_journal(State))
+ false -> close_all_handles(State)
end.
terminate_and_erase(State) ->
@@ -434,6 +427,16 @@ all_segment_nums(Dir) ->
lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
|| SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
+blank_state(QueueName) ->
+ StrName = queue_name_to_dir_name(QueueName),
+ Dir = filename:join(queues_dir(), StrName),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ #qistate { dir = Dir,
+ seg_num_handles = dict:new(),
+ journal_ack_count = 0,
+ journal_ack_dict = dict:new(),
+ seg_ack_counts = dict:new() }.
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
@@ -442,25 +445,27 @@ all_segment_nums(Dir) ->
queue_index_walker([]) ->
finished;
queue_index_walker([QueueName|QueueNames]) ->
- {TotalMsgCount, State} = init(QueueName),
- {LowSeqIdSeg, _NextSeqId, State1} =
- find_lowest_seq_id_seg_and_next_seq_id(State),
- queue_index_walker({TotalMsgCount, LowSeqIdSeg, State1, QueueNames});
+ State = blank_state(QueueName),
+ {Hdl, State1} = get_journal_handle(State),
+ JAckDict = load_journal(Hdl, dict:new()),
+ State2 = #qistate { dir = Dir } =
+ close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }),
+ SegNums = all_segment_nums(Dir),
+ queue_index_walker({SegNums, State2, QueueNames});
-queue_index_walker({0, _LowSeqIdSeg, State, QueueNames}) ->
- terminate(State),
+queue_index_walker({[], State, QueueNames}) ->
+ _State = terminate(State),
queue_index_walker(QueueNames);
-queue_index_walker({N, LowSeqIdSeg, State, QueueNames}) ->
- {Entries, State1} = read_segment_entries(LowSeqIdSeg, State),
- LowSeqIdSeg1 = LowSeqIdSeg + segment_size(),
- queue_index_walker({Entries, N, LowSeqIdSeg1, State1, QueueNames});
-
-queue_index_walker({[], N, LowSeqIdSeg, State, QueueNames}) ->
- queue_index_walker({N, LowSeqIdSeg, State, QueueNames});
-queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries],
- N, LowSeqIdSeg, State, QueueNames}) ->
+queue_index_walker({[SegNum | SegNums], State, QueueNames}) ->
+ {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State),
+ queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames});
+
+queue_index_walker({[], State, SegNums, QueueNames}) ->
+ queue_index_walker({SegNums, State, QueueNames});
+queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs],
+ State, SegNums, QueueNames}) ->
{MsgId, bool_to_int(IsPersistent),
- {Entries, N - 1, LowSeqIdSeg, State, QueueNames}}.
+ {Msgs, State, SegNums, QueueNames}}.
%%----------------------------------------------------------------------------