summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-02 15:38:43 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-02 15:38:43 +0000
commit1ce1bdb2011928da4d8d12881ca51e624445ea97 (patch)
treef662bf4f15af1a686c4e3b6775dba06e3bfb986e
parentacc6bbb9f6b29f194ddd0e60087489147ea5234d (diff)
downloadrabbitmq-server-git-1ce1bdb2011928da4d8d12881ca51e624445ea97.tar.gz
Finished. It might work - untested though
-rw-r--r--src/rabbit_queue_index3.erl252
1 files changed, 217 insertions, 35 deletions
diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl
index 01a7c748eb..43a210d950 100644
--- a/src/rabbit_queue_index3.erl
+++ b/src/rabbit_queue_index3.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index3).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1,
+ write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -103,6 +103,41 @@
%%----------------------------------------------------------------------------
+-ifdef(use_specs).
+
+-type(hdl() :: ('undefined' | any())).
+-type(msg_id() :: binary()).
+-type(seq_id() :: integer()).
+-type(qistate() :: #qistate { dir :: file_path(),
+ segments :: dict(),
+ journal_handle :: hdl(),
+ dirty_count :: integer()
+ }).
+
+-spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}).
+-spec(terminate/1 :: (qistate()) -> qistate()).
+-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
+-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
+ -> qistate()).
+-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
+-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
+-spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()).
+-spec(flush_journal/1 :: (qistate()) -> qistate()).
+-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
+ {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}).
+-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
+-spec(segment_size/0 :: () -> non_neg_integer()).
+-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
+ {non_neg_integer(), non_neg_integer(), qistate()}).
+-spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok').
+
+-endif.
+
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
init(Name) ->
State = blank_state(Name),
%% 1. Load the journal completely. This will also load segments
@@ -158,46 +193,82 @@ init(Name) ->
end, 0, AllSegs),
{Count, State3}.
-terminate(State = #qistate { segments = Segments, journal_handle = JournalHdl,
- dir = Dir }) ->
- ok = case JournalHdl of
- undefined -> ok;
- _ -> file_handle_cache:close(JournalHdl)
- end,
- ok = dict:fold(
- fun (_Seg, #segment { handle = undefined }, ok) ->
- ok;
- (_Seg, #segment { handle = Hdl }, ok) ->
- file_handle_cache:close(Hdl)
- end, ok, Segments),
- store_clean_shutdown(Dir),
- State #qistate { journal_handle = undefined, segments = dict:new() }.
+terminate(State) ->
+ terminate(true, State).
terminate_and_erase(State) ->
State1 = terminate(State),
ok = delete_queue_directory(State1 #qistate.dir),
State1.
+write_published(MsgId, SeqId, IsPersistent, State)
+ when is_binary(MsgId) ->
+ ?MSG_ID_BYTES = size(MsgId),
+ {JournalHdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(JournalHdl,
+ [<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>,
+ MsgId]),
+ maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)).
+
+write_delivered(SeqId, State) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(JournalHdl,
+ <<?DEL_JPREFIX:?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>>),
+ maybe_flush_journal(add_to_journal(SeqId, del, State1)).
+
+write_acks(SeqIds, State) ->
+ {SeqIds1, State1} = remove_pubs_dels_from_journal(SeqIds, State),
+ case SeqIds1 of
+ [] ->
+ State;
+ _ ->
+ {JournalHdl, State2} = get_journal_handle(State1),
+ ok = file_handle_cache:append(JournalHdl,
+ [<<?ACK_JPREFIX:?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>>
+ || SeqId <- SeqIds1]),
+ State3 = lists:foldl(fun (SeqId, StateN) ->
+ add_to_journal(SeqId, ack, StateN)
+ end, State2, SeqIds1),
+ maybe_flush_journal(State3)
+ end.
+
+sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ State.
+
flush_journal(State = #qistate { dirty_count = 0 }) ->
State;
flush_journal(State = #qistate { segments = Segments }) ->
- dict:fold(
- fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
- acks = AckCount } = Segment, StateN) ->
- case dict:is_empty(JEntries) of
- true -> store_segment(Segment, StateN);
- false when AckCount == PubCount ->
- ok = delete_segment(Segment);
- false ->
- {Hdl, Segment1} = get_segment_handle(Segment),
- dict:fold(fun write_entry_to_segment/3,
- Hdl, JEntries),
- ok = file_handle_cache:sync(Hdl),
- store_segment(
- Segment1 #segment { journal_entries = dict:new() },
- StateN)
- end
- end, State, Segments).
+ State1 =
+ dict:fold(
+ fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
+ acks = AckCount } = Segment, StateN) ->
+ case dict:is_empty(JEntries) of
+ true -> store_segment(Segment, StateN);
+ false when AckCount == PubCount ->
+ ok = delete_segment(Segment);
+ false ->
+ {Hdl, Segment1} = get_segment_handle(Segment),
+ dict:fold(fun write_entry_to_segment/3,
+ Hdl, JEntries),
+ ok = file_handle_cache:sync(Hdl),
+ store_segment(
+ Segment1 #segment { journal_entries = dict:new() },
+ StateN)
+ end
+ end, State, Segments),
+ {JournalHdl, State2} = get_journal_handle(State1),
+ {ok, 0} = file_handle_cache:position(JournalHdl, bof),
+ ok = file_handle_cache:truncate(JournalHdl),
+ ok = file_handle_cache:sync(JournalHdl),
+ State2 #qistate { dirty_count = 0 }.
read_segment_entries(InitSeqId, State) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
@@ -241,10 +312,81 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
end,
{LowSeqIdSeg, NextSeqId, State}.
+start_msg_store(DurableQueues) ->
+ DurableDict =
+ dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
+ Queue #amqqueue.name} || Queue <- DurableQueues ]),
+ QueuesDir = queues_dir(),
+ Directories = case file:list_dir(QueuesDir) of
+ {ok, Entries} ->
+ [ Entry || Entry <- Entries,
+ filelib:is_dir(
+ filename:join(QueuesDir, Entry)) ];
+ {error, enoent} ->
+ []
+ end,
+ DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
+ {DurableQueueNames, TransientDirs} =
+ lists:foldl(
+ fun (QueueDir, {DurableAcc, TransientAcc}) ->
+ case sets:is_element(QueueDir, DurableDirectories) of
+ true ->
+ {[dict:fetch(QueueDir, DurableDict) | DurableAcc],
+ TransientAcc};
+ false ->
+ {DurableAcc, [QueueDir | TransientAcc]}
+ end
+ end, {[], []}, Directories),
+ MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"),
+ ok = rabbit:start_child(rabbit_msg_store, [MsgStoreDir,
+ fun queue_index_walker/1,
+ DurableQueueNames]),
+ lists:foreach(fun (DirName) ->
+ Dir = filename:join(queues_dir(), DirName),
+ ok = delete_queue_directory(Dir)
+ end, TransientDirs),
+ ok.
+
+%%----------------------------------------------------------------------------
+%% Msg Store Startup Delta Function
+%%----------------------------------------------------------------------------
+
+queue_index_walker([]) ->
+ finished;
+queue_index_walker([QueueName|QueueNames]) ->
+ State = #qistate { dir = Dir } = blank_state(QueueName),
+ State1 = #qistate { journal_handle = JHdl } = load_journal(State),
+ ok = file_handle_cache:close(JHdl),
+ SegNums = all_segment_nums(Dir),
+ queue_index_walker({SegNums, State1, QueueNames});
+
+queue_index_walker({[], State, QueueNames}) ->
+ _State = terminate(false, State),
+ queue_index_walker(QueueNames);
+queue_index_walker({[Seg | SegNums], State, QueueNames}) ->
+ SeqId = reconstruct_seq_id(Seg, 0),
+ {Messages, State1} = read_segment_entries(SeqId, State),
+ queue_index_walker({Messages, State1, SegNums, QueueNames});
+
+queue_index_walker({[], State, SegNums, QueueNames}) ->
+ queue_index_walker({SegNums, State, QueueNames});
+queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs],
+ State, SegNums, QueueNames}) ->
+ case IsPersistent of
+ true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
+ false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
+ end.
+
%%----------------------------------------------------------------------------
%% Minors
%%----------------------------------------------------------------------------
+maybe_flush_journal(State = #qistate { dirty_count = DCount })
+ when DCount > ?MAX_JOURNAL_ENTRY_COUNT ->
+ flush_journal(State);
+maybe_flush_journal(State) ->
+ State.
+
all_segment_nums(Dir) ->
lists:sort(
[list_to_integer(
@@ -370,6 +512,46 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
end,
Hdl.
+terminate(StoreShutdown, State =
+ #qistate { segments = Segments, journal_handle = JournalHdl,
+ dir = Dir }) ->
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ ok = dict:fold(
+ fun (_Seg, #segment { handle = undefined }, ok) ->
+ ok;
+ (_Seg, #segment { handle = Hdl }, ok) ->
+ file_handle_cache:close(Hdl)
+ end, ok, Segments),
+ case StoreShutdown of
+ true -> store_clean_shutdown(Dir);
+ false -> ok
+ end,
+ State #qistate { journal_handle = undefined, segments = dict:new() }.
+
+remove_pubs_dels_from_journal(SeqIds, State) ->
+ lists:foldl(
+ fun (SeqId, {SeqIdsAcc, StateN}) ->
+ {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ Segment = #segment { journal_entries = JEntries,
+ acks = AckCount } =
+ find_segment(Seg, StateN),
+ case dict:find(RelSeq, JEntries) of
+ {ok, {{_MsgId, _IsPersistent}, del, no_ack}} ->
+ StateN1 =
+ store_segment(
+ Segment #segment { journal_entries =
+ dict:erase(RelSeq, JEntries),
+ acks = AckCount + 1 },
+ StateN),
+ {SeqIdsAcc, StateN1};
+ _ ->
+ {[SeqId | SeqIdsAcc], StateN}
+ end
+ end, {[], State}, SeqIds).
+
%%----------------------------------------------------------------------------
%% Majors
%%----------------------------------------------------------------------------
@@ -451,7 +633,7 @@ load_journal(State) ->
%% We want to keep acks in so that we can remove them if
%% duplicates are in the journal. The counts here are
%% purely from the segment itself.
- {SegDict, PubCount, AckCount, StateN1} =
+ {SegDict, PubCountInSeg, AckCountInSeg, StateN1} =
load_segment(Seg, true, StateN),
%% Removed counts here are the number of pubs and acks
%% that are duplicates - i.e. found in both the segment
@@ -459,8 +641,8 @@ load_journal(State) ->
{JEntries1, PubsRemoved, AcksRemoved} =
journal_minus_segment(JEntries, SegDict),
{Segment1, StateN2} = find_segment(Seg, StateN1),
- PubCount1 = PubCount + PubCountInJournal - PubsRemoved,
- AckCount1 = AckCount + AckCountInJournal - AcksRemoved,
+ PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
+ AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
store_segment(Segment1 #segment { journal_entries = JEntries1,
pubs = PubCount1,
acks = AckCount1 }, StateN2)