diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 15:51:14 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 15:51:14 +0100 |
| commit | 4d85abee069bcfab253a7a9040ddcb08c6aa05b1 (patch) | |
| tree | cddd61eaf535bca27490218eb668edf9ae893d55 | |
| parent | 2e57b7e5812fe6932263354a07410cd8956f28e4 (diff) | |
| download | rabbitmq-server-git-4d85abee069bcfab253a7a9040ddcb08c6aa05b1.tar.gz | |
queue_index keeps track of which messages have been published but not written to disk
There are 2 ways queue_index writes something to disk:
- the journal is synced, and
- the journal is flushed (i.e. scattered across many files and those
files closed).
Since queue_index is the same process as variable_queue, we just keep
a list of guids that have been published and we just call an on_sync
handler whenever these are written to disk.
| -rw-r--r-- | src/rabbit_queue_index.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 3 |
2 files changed, 35 insertions, 19 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d6b8bb2889..a82da57ae7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/5, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -164,7 +164,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -187,15 +187,18 @@ segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: fun (([rabbit_guid:guid()]) -> ok), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), - A}). + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), + A}). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), + fun (([seq_id()]) -> ok)) + -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> @@ -220,20 +223,23 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) -> - State = #qistate { dir = Dir } = blank_state(Name, not Recover), +init(Name, Recover, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> + State + = #qistate { dir = Dir } + = blank_state(Name, not Recover), + State1 = State #qistate { on_sync = OnSyncFun, unsynced_guids = [] }, Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 end, CleanShutdown = detect_clean_shutdown(Dir), - {Count, State1} = + {Count, State2} = case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + init_clean(RecoveredCounts, State1); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end, - {Count, Terms, State1}. + {Count, Terms, State2}. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -253,7 +259,9 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + State2 = State#qistate { unsynced_guids = + [Guid | State#qistate.unsynced_guids] }, + maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -265,7 +273,8 @@ sync([], State) -> State; sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +sync(SeqIds, State = #qistate { journal_handle = JournalHdl, + on_sync = OnSyncFun }) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -275,7 +284,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + OnSyncFun(State#qistate.unsynced_guids), + State#qistate { unsynced_guids = [] }. flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -557,7 +567,9 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount, maybe_flush_journal(State) -> State. -flush_journal(State = #qistate { segments = Segments }) -> +flush_journal(State = #qistate { segments = Segments, + on_sync = OnSyncFun, + unsynced_guids = UGs }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -572,7 +584,8 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + OnSyncFun(UGs), + State1 #qistate { dirty_count = 0, unsynced_guids = [] }. append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e4020b6069..2b0919a1f3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -375,6 +375,9 @@ init(QueueName, IsDurable, Recover) -> rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end, + fun (Guids) -> + rabbit_log:info("message indices ~p commited to disk~n", [Guids]) end), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), |
