summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-07 13:52:48 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-07 13:52:48 +0100
commit25b0c82ec68026260ae7fad8dbe2ea1859bc1034 (patch)
treeaa4e7dc0ec227dfe6293a5eea6d84f43bbf21b3f
parentd14be783c9dc0ebb674f5fce0f654b78a97647bf (diff)
downloadrabbitmq-server-git-25b0c82ec68026260ae7fad8dbe2ea1859bc1034.tar.gz
Make init include in its result the next sequence id to use
-rw-r--r--src/rabbit_queue_index.erl48
1 files changed, 33 insertions, 15 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 34bb9920ec..e4111f82fe 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -132,7 +132,7 @@
seg_ack_counts :: dict()
}).
--spec(init/1 :: (string()) -> qistate()).
+-spec(init/1 :: (string()) -> {non_neg_integer(), qistate()}).
-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
@@ -154,14 +154,15 @@ init(Name) ->
AckCounts = scatter_journal(Dir, find_ack_counts(Dir)),
{ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME),
[raw, binary, delayed_write, write, read]),
- #qistate { dir = Dir,
- cur_seg_num = undefined,
- cur_seg_hdl = undefined,
- journal_ack_count = 0,
- journal_ack_dict = dict:new(),
- journal_handle = JournalHdl,
- seg_ack_counts = AckCounts
- }.
+ {find_next_seq_id(Dir),
+ #qistate { dir = Dir,
+ cur_seg_num = undefined,
+ cur_seg_hdl = undefined,
+ journal_ack_count = 0,
+ journal_ack_dict = dict:new(),
+ journal_handle = JournalHdl,
+ seg_ack_counts = AckCounts
+ }}.
write_published(MsgId, SeqId, IsPersistent, State)
when is_binary(MsgId) ->
@@ -232,7 +233,7 @@ read_segment_entries(InitSeqId, State =
SegPath = seg_num_to_path(Dir, SegNum),
{SDict, _AckCount} = load_segment(SegNum, SegPath, JAckDict),
%% deliberately sort the list desc, because foldl will reverse it
- RelSeqs = lists:sort(fun (A, B) -> B < A end, dict:fetch_keys(SDict)),
+ RelSeqs = rev_sort(dict:fetch_keys(SDict)),
{lists:foldl(fun (RelSeq, Acc) ->
{MsgId, IsDelivered, IsPersistent} =
dict:fetch(RelSeq, SDict),
@@ -245,6 +246,9 @@ read_segment_entries(InitSeqId, State =
%% Minor Helpers
%%----------------------------------------------------------------------------
+rev_sort(List) ->
+ lists:sort(fun (A, B) -> B < A end, List).
+
close_file_handle_for_seg(_SegNum,
State = #qistate { cur_seg_num = undefined }) ->
State;
@@ -285,12 +289,26 @@ seg_num_to_path(Dir, SegNum) ->
%% Startup Functions
%%----------------------------------------------------------------------------
+all_segment_nums_paths(Dir) ->
+ [{list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
+ SegName)), filename:join(Dir, SegName)}
+ || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
+
+find_next_seq_id(Dir) ->
+ SegNumsPaths = all_segment_nums_paths(Dir),
+ case rev_sort(SegNumsPaths) of
+ [] -> 0;
+ [{SegNum, SegPath}|_] ->
+ {SDict, _AckCount} = load_segment(SegNum, SegPath, dict:new()),
+ case rev_sort(dict:fetch_keys(SDict)) of
+ [] -> 0;
+ [RelSeq|_] -> 1 + reconstruct_seq_id(SegNum, RelSeq)
+ end
+ end.
+
find_ack_counts(Dir) ->
- SegNumsPaths =
- [{list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
- SegName)), filename:join(Dir, SegName)}
- || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ SegNumsPaths = all_segment_nums_paths(Dir),
lists:foldl(
fun ({SegNum, SegPath}, Acc) ->
case load_segment(SegNum, SegPath, dict:new()) of