diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-02 15:38:43 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-02 15:38:43 +0000 |
| commit | 1ce1bdb2011928da4d8d12881ca51e624445ea97 (patch) | |
| tree | f662bf4f15af1a686c4e3b6775dba06e3bfb986e | |
| parent | acc6bbb9f6b29f194ddd0e60087489147ea5234d (diff) | |
| download | rabbitmq-server-git-1ce1bdb2011928da4d8d12881ca51e624445ea97.tar.gz | |
Finished. It might work - untested though
| -rw-r--r-- | src/rabbit_queue_index3.erl | 252 |
1 files changed, 217 insertions, 35 deletions
diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl index 01a7c748eb..43a210d950 100644 --- a/src/rabbit_queue_index3.erl +++ b/src/rabbit_queue_index3.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index3). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1, + write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). @@ -103,6 +103,41 @@ %%---------------------------------------------------------------------------- +-ifdef(use_specs). + +-type(hdl() :: ('undefined' | any())). +-type(msg_id() :: binary()). +-type(seq_id() :: integer()). +-type(qistate() :: #qistate { dir :: file_path(), + segments :: dict(), + journal_handle :: hdl(), + dirty_count :: integer() + }). + +-spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). +-spec(terminate/1 :: (qistate()) -> qistate()). +-spec(terminate_and_erase/1 :: (qistate()) -> qistate()). +-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) + -> qistate()). +-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). +-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(flush_journal/1 :: (qistate()) -> qistate()). +-spec(read_segment_entries/2 :: (seq_id(), qistate()) -> + {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). +-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). +-spec(segment_size/0 :: () -> non_neg_integer()). +-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> + {non_neg_integer(), non_neg_integer(), qistate()}). +-spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok'). + +-endif. + + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + init(Name) -> State = blank_state(Name), %% 1. Load the journal completely. This will also load segments @@ -158,46 +193,82 @@ init(Name) -> end, 0, AllSegs), {Count, State3}. -terminate(State = #qistate { segments = Segments, journal_handle = JournalHdl, - dir = Dir }) -> - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - ok = dict:fold( - fun (_Seg, #segment { handle = undefined }, ok) -> - ok; - (_Seg, #segment { handle = Hdl }, ok) -> - file_handle_cache:close(Hdl) - end, ok, Segments), - store_clean_shutdown(Dir), - State #qistate { journal_handle = undefined, segments = dict:new() }. +terminate(State) -> + terminate(true, State). terminate_and_erase(State) -> State1 = terminate(State), ok = delete_queue_directory(State1 #qistate.dir), State1. +write_published(MsgId, SeqId, IsPersistent, State) + when is_binary(MsgId) -> + ?MSG_ID_BYTES = size(MsgId), + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + [<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, + MsgId]), + maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). + +write_delivered(SeqId, State) -> + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + <<?DEL_JPREFIX:?JPREFIX_BITS, + SeqId:?SEQ_BITS>>), + maybe_flush_journal(add_to_journal(SeqId, del, State1)). + +write_acks(SeqIds, State) -> + {SeqIds1, State1} = remove_pubs_dels_from_journal(SeqIds, State), + case SeqIds1 of + [] -> + State; + _ -> + {JournalHdl, State2} = get_journal_handle(State1), + ok = file_handle_cache:append(JournalHdl, + [<<?ACK_JPREFIX:?JPREFIX_BITS, + SeqId:?SEQ_BITS>> + || SeqId <- SeqIds1]), + State3 = lists:foldl(fun (SeqId, StateN) -> + add_to_journal(SeqId, ack, StateN) + end, State2, SeqIds1), + maybe_flush_journal(State3) + end. + +sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> + State; +sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + State. + flush_journal(State = #qistate { dirty_count = 0 }) -> State; flush_journal(State = #qistate { segments = Segments }) -> - dict:fold( - fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, - acks = AckCount } = Segment, StateN) -> - case dict:is_empty(JEntries) of - true -> store_segment(Segment, StateN); - false when AckCount == PubCount -> - ok = delete_segment(Segment); - false -> - {Hdl, Segment1} = get_segment_handle(Segment), - dict:fold(fun write_entry_to_segment/3, - Hdl, JEntries), - ok = file_handle_cache:sync(Hdl), - store_segment( - Segment1 #segment { journal_entries = dict:new() }, - StateN) - end - end, State, Segments). + State1 = + dict:fold( + fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, + acks = AckCount } = Segment, StateN) -> + case dict:is_empty(JEntries) of + true -> store_segment(Segment, StateN); + false when AckCount == PubCount -> + ok = delete_segment(Segment); + false -> + {Hdl, Segment1} = get_segment_handle(Segment), + dict:fold(fun write_entry_to_segment/3, + Hdl, JEntries), + ok = file_handle_cache:sync(Hdl), + store_segment( + Segment1 #segment { journal_entries = dict:new() }, + StateN) + end + end, State, Segments), + {JournalHdl, State2} = get_journal_handle(State1), + {ok, 0} = file_handle_cache:position(JournalHdl, bof), + ok = file_handle_cache:truncate(JournalHdl), + ok = file_handle_cache:sync(JournalHdl), + State2 #qistate { dirty_count = 0 }. read_segment_entries(InitSeqId, State) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), @@ -241,10 +312,81 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> end, {LowSeqIdSeg, NextSeqId, State}. +start_msg_store(DurableQueues) -> + DurableDict = + dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), + Queue #amqqueue.name} || Queue <- DurableQueues ]), + QueuesDir = queues_dir(), + Directories = case file:list_dir(QueuesDir) of + {ok, Entries} -> + [ Entry || Entry <- Entries, + filelib:is_dir( + filename:join(QueuesDir, Entry)) ]; + {error, enoent} -> + [] + end, + DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), + {DurableQueueNames, TransientDirs} = + lists:foldl( + fun (QueueDir, {DurableAcc, TransientAcc}) -> + case sets:is_element(QueueDir, DurableDirectories) of + true -> + {[dict:fetch(QueueDir, DurableDict) | DurableAcc], + TransientAcc}; + false -> + {DurableAcc, [QueueDir | TransientAcc]} + end + end, {[], []}, Directories), + MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"), + ok = rabbit:start_child(rabbit_msg_store, [MsgStoreDir, + fun queue_index_walker/1, + DurableQueueNames]), + lists:foreach(fun (DirName) -> + Dir = filename:join(queues_dir(), DirName), + ok = delete_queue_directory(Dir) + end, TransientDirs), + ok. + +%%---------------------------------------------------------------------------- +%% Msg Store Startup Delta Function +%%---------------------------------------------------------------------------- + +queue_index_walker([]) -> + finished; +queue_index_walker([QueueName|QueueNames]) -> + State = #qistate { dir = Dir } = blank_state(QueueName), + State1 = #qistate { journal_handle = JHdl } = load_journal(State), + ok = file_handle_cache:close(JHdl), + SegNums = all_segment_nums(Dir), + queue_index_walker({SegNums, State1, QueueNames}); + +queue_index_walker({[], State, QueueNames}) -> + _State = terminate(false, State), + queue_index_walker(QueueNames); +queue_index_walker({[Seg | SegNums], State, QueueNames}) -> + SeqId = reconstruct_seq_id(Seg, 0), + {Messages, State1} = read_segment_entries(SeqId, State), + queue_index_walker({Messages, State1, SegNums, QueueNames}); + +queue_index_walker({[], State, SegNums, QueueNames}) -> + queue_index_walker({SegNums, State, QueueNames}); +queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs], + State, SegNums, QueueNames}) -> + case IsPersistent of + true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; + false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) + end. + %%---------------------------------------------------------------------------- %% Minors %%---------------------------------------------------------------------------- +maybe_flush_journal(State = #qistate { dirty_count = DCount }) + when DCount > ?MAX_JOURNAL_ENTRY_COUNT -> + flush_journal(State); +maybe_flush_journal(State) -> + State. + all_segment_nums(Dir) -> lists:sort( [list_to_integer( @@ -370,6 +512,46 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> end, Hdl. +terminate(StoreShutdown, State = + #qistate { segments = Segments, journal_handle = JournalHdl, + dir = Dir }) -> + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + ok = dict:fold( + fun (_Seg, #segment { handle = undefined }, ok) -> + ok; + (_Seg, #segment { handle = Hdl }, ok) -> + file_handle_cache:close(Hdl) + end, ok, Segments), + case StoreShutdown of + true -> store_clean_shutdown(Dir); + false -> ok + end, + State #qistate { journal_handle = undefined, segments = dict:new() }. + +remove_pubs_dels_from_journal(SeqIds, State) -> + lists:foldl( + fun (SeqId, {SeqIdsAcc, StateN}) -> + {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + Segment = #segment { journal_entries = JEntries, + acks = AckCount } = + find_segment(Seg, StateN), + case dict:find(RelSeq, JEntries) of + {ok, {{_MsgId, _IsPersistent}, del, no_ack}} -> + StateN1 = + store_segment( + Segment #segment { journal_entries = + dict:erase(RelSeq, JEntries), + acks = AckCount + 1 }, + StateN), + {SeqIdsAcc, StateN1}; + _ -> + {[SeqId | SeqIdsAcc], StateN} + end + end, {[], State}, SeqIds). + %%---------------------------------------------------------------------------- %% Majors %%---------------------------------------------------------------------------- @@ -451,7 +633,7 @@ load_journal(State) -> %% We want to keep acks in so that we can remove them if %% duplicates are in the journal. The counts here are %% purely from the segment itself. - {SegDict, PubCount, AckCount, StateN1} = + {SegDict, PubCountInSeg, AckCountInSeg, StateN1} = load_segment(Seg, true, StateN), %% Removed counts here are the number of pubs and acks %% that are duplicates - i.e. found in both the segment @@ -459,8 +641,8 @@ load_journal(State) -> {JEntries1, PubsRemoved, AcksRemoved} = journal_minus_segment(JEntries, SegDict), {Segment1, StateN2} = find_segment(Seg, StateN1), - PubCount1 = PubCount + PubCountInJournal - PubsRemoved, - AckCount1 = AckCount + AckCountInJournal - AcksRemoved, + PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, + AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, store_segment(Segment1 #segment { journal_entries = JEntries1, pubs = PubCount1, acks = AckCount1 }, StateN2) |
