summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 15:51:14 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 15:51:14 +0100
commit4d85abee069bcfab253a7a9040ddcb08c6aa05b1 (patch)
treecddd61eaf535bca27490218eb668edf9ae893d55
parent2e57b7e5812fe6932263354a07410cd8956f28e4 (diff)
downloadrabbitmq-server-git-4d85abee069bcfab253a7a9040ddcb08c6aa05b1.tar.gz
queue_index keeps track of which messages have been published but not written to disk
There are 2 ways queue_index writes something to disk: - the journal is synced, and - the journal is flushed (i.e. scattered across many files and those files closed). Since queue_index is the same process as variable_queue, we just keep a list of guids that have been published and we just call an on_sync handler whenever these are written to disk.
-rw-r--r--src/rabbit_queue_index.erl51
-rw-r--r--src/rabbit_variable_queue.erl3
2 files changed, 35 insertions, 19 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6b8bb2889..a82da57ae7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/5, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -164,7 +164,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries }).
+ max_journal_entries, on_sync, unsynced_guids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -187,15 +187,18 @@
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer(),
- max_journal_entries :: non_neg_integer()
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: fun (([rabbit_guid:guid()]) -> ok),
+ unsynced_guids :: [rabbit_guid:guid()]
}).
-type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
- A}).
+ {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
+ A}).
--spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
- {'undefined' | non_neg_integer(), [any()], qistate()}).
+-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean()),
+ fun (([seq_id()]) -> ok))
+ -> {'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) ->
@@ -220,20 +223,23 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name, not Recover),
+init(Name, Recover, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
+ State
+ = #qistate { dir = Dir }
+ = blank_state(Name, not Recover),
+ State1 = State #qistate { on_sync = OnSyncFun, unsynced_guids = [] },
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
end,
CleanShutdown = detect_clean_shutdown(Dir),
- {Count, State1} =
+ {Count, State2} =
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
- init_clean(RecoveredCounts, State);
- false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
+ init_clean(RecoveredCounts, State1);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end,
- {Count, Terms, State1}.
+ {Count, Terms, State2}.
terminate(Terms, State) ->
{SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
@@ -253,7 +259,9 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
+ State2 = State#qistate { unsynced_guids =
+ [Guid | State#qistate.unsynced_guids] },
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -265,7 +273,8 @@ sync([], State) ->
State;
sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+sync(SeqIds, State = #qistate { journal_handle = JournalHdl,
+ on_sync = OnSyncFun }) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -275,7 +284,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- State.
+ OnSyncFun(State#qistate.unsynced_guids),
+ State#qistate { unsynced_guids = [] }.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -557,7 +567,9 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount,
maybe_flush_journal(State) ->
State.
-flush_journal(State = #qistate { segments = Segments }) ->
+flush_journal(State = #qistate { segments = Segments,
+ on_sync = OnSyncFun,
+ unsynced_guids = UGs }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
@@ -572,7 +584,8 @@ flush_journal(State = #qistate { segments = Segments }) ->
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
+ OnSyncFun(UGs),
+ State1 #qistate { dirty_count = 0, unsynced_guids = [] }.
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e4020b6069..2b0919a1f3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -375,6 +375,9 @@ init(QueueName, IsDurable, Recover) ->
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end,
+ fun (Guids) ->
+ rabbit_log:info("message indices ~p commited to disk~n", [Guids])
end),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),