diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gatherer.erl | 142 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 51 |
3 files changed, 213 insertions, 55 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl new file mode 100644 index 0000000000..8c44388c40 --- /dev/null +++ b/src/gatherer.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(gatherer). + +-behaviour(gen_server2). + +-export([start_link/0, wait_on/2, produce/2, finished/2, fetch/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(wait_on/2 :: (pid(), any()) -> 'ok'). +-spec(produce/2 :: (pid(), any()) -> 'ok'). +-spec(finished/2 :: (pid(), any()) -> 'ok'). +-spec(fetch/1 :: (pid()) -> {'value', any()} | 'finished'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +-record(gstate, { waiting_on, results, blocking }). + +%%---------------------------------------------------------------------------- + +wait_on(Pid, Token) -> + gen_server2:call(Pid, {wait_on, Token}, infinity). + +produce(Pid, Result) -> + gen_server2:cast(Pid, {produce, Result}). + +finished(Pid, Token) -> + gen_server2:call(Pid, {finished, Token}, infinity). + +fetch(Pid) -> + gen_server2:call(Pid, fetch, infinity). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). + +init([]) -> + {ok, #gstate { waiting_on = sets:new(), results = queue:new(), + blocking = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({wait_on, Token}, _From, State = #gstate { waiting_on = Tokens }) -> + {reply, ok, State #gstate { waiting_on = sets:add_element(Token, Tokens) }, + hibernate}; + +handle_call({finished, Token}, _From, + State = #gstate { waiting_on = Tokens, results = Results, + blocking = Blocking }) -> + Tokens1 = sets:del_element(Token, Tokens), + State1 = State #gstate { waiting_on = Tokens1 }, + case 0 =:= sets:size(Tokens1) andalso queue:is_empty(Results) andalso + not queue:is_empty(Blocking) of + true -> {stop, normal, ok, State1}; + false -> {reply, ok, State1, hibernate} + end; + +handle_call(fetch, From, State = + #gstate { blocking = Blocking, results = Results, + waiting_on = Tokens }) -> + case queue:out(Results) of + {empty, _Results} -> + case sets:size(Tokens) of + 0 -> {stop, normal, finished, State}; + _ -> {noreply, + State #gstate { blocking = queue:in(From, Blocking) }, + hibernate} + end; + {{value, Result}, Results1} -> + {reply, {value, Result}, State #gstate { results = Results1 }, + hibernate} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({produce, Result}, State = #gstate { blocking = Blocking, + results = Results }) -> + {noreply, case queue:out(Blocking) of + {empty, _Blocking} -> + State #gstate { results = queue:in(Result, Results) }; + {{value, Blocked}, Blocking1} -> + gen_server2:reply(Blocked, {value, Result}), + State #gstate { blocking = Blocking1 } + end, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State = #gstate { blocking = Blocking } ) -> + [gen_server2:reply(Blocked, finished) + || Blocked <- queue:to_list(Blocking) ], + State. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5610b35e34..82a9ddd71d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,7 +36,8 @@ -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, sync/3, client_init/1, client_terminate/1, clean/2]). --export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal +-export([sync/1, gc_done/4, set_maximum_since_use/2, + build_index_worker/6]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). @@ -467,8 +468,6 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = %%---------------------------------------------------------------------------- init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> - process_flag(trap_exit, true), - ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), @@ -527,6 +526,8 @@ init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), + process_flag(trap_exit, true), + {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, FileSummaryEts), @@ -1182,23 +1183,44 @@ find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. -build_index([], State) -> - build_index(undefined, [State #msstate.current_file], State); build_index(Files, State) -> - {Offset, State1} = build_index(undefined, Files, State), - {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}. - -build_index(Left, [], State = #msstate { file_summary_ets = FileSummaryEts }) -> - ok = index_delete_by_file(undefined, State), - Offset = case ets:lookup(FileSummaryEts, Left) of - [] -> 0; - [#file_summary { file_size = FileSize }] -> FileSize - end, - {Offset, State #msstate { current_file = Left }}; -build_index(Left, [File|Files], - State = #msstate { dir = Dir, sum_valid_data = SumValid, - sum_file_size = SumFileSize, - file_summary_ets = FileSummaryEts }) -> + {ok, Pid} = gatherer:start_link(), + case Files of + [] -> build_index(Pid, undefined, [State #msstate.current_file], State); + _ -> {Offset, State1} = build_index(Pid, undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} + end. + +build_index(Gatherer, Left, [], + State = #msstate { file_summary_ets = FileSummaryEts, + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> + case gatherer:fetch(Gatherer) of + finished -> + ok = index_delete_by_file(undefined, State), + Offset = case ets:lookup(FileSummaryEts, Left) of + [] -> 0; + [#file_summary { file_size = FileSize }] -> FileSize + end, + {Offset, State #msstate { current_file = Left }}; + {value, FileSummary = + #file_summary { valid_total_size = ValidTotalSize, + file_size = FileSize }} -> + true = ets:insert_new(FileSummaryEts, FileSummary), + build_index(Gatherer, Left, [], + State #msstate { + sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize }) + end; +build_index(Gatherer, Left, [File|Files], State) -> + Child = make_ref(), + ok = gatherer:wait_on(Gatherer, Child), + ok = worker_pool:submit_async({?MODULE, build_index_worker, + [Gatherer, Child, State, Left, File, Files]}), + build_index(Gatherer, File, Files, State). + +build_index_worker( + Gatherer, Guid, State = #msstate { dir = Dir }, Left, File, Files) -> {ok, Messages, FileSize} = rabbit_msg_store_misc:scan_file_for_valid_messages( Dir, rabbit_msg_store_misc:filenum_to_name(File)), @@ -1231,15 +1253,12 @@ build_index(Left, [File|Files], end}; [F|_] -> {F, FileSize} end, - true = - ets:insert_new(FileSummaryEts, #file_summary { - file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = false, - left = Left, right = Right, file_size = FileSize1, - readers = 0 }), - build_index(File, Files, - State #msstate { sum_valid_data = SumValid + ValidTotalSize, - sum_file_size = SumFileSize + FileSize1 }). + ok = gatherer:produce(Gatherer, #file_summary { + file = File, valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, locked = false, + left = Left, right = Right, file_size = FileSize1, + readers = 0 }), + ok = gatherer:finished(Gatherer, Guid). %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 935f275451..2a94adf77f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -422,48 +422,45 @@ start_persistent_msg_store(DurableQueues) -> %%---------------------------------------------------------------------------- queue_index_walker(DurableQueues) when is_list(DurableQueues) -> - queue_index_walker({DurableQueues, sets:new()}); - -queue_index_walker({[], Kids}) -> - case sets:size(Kids) of - 0 -> finished; - _ -> receive - {found, MsgId, Count} -> - {MsgId, Count, {[], Kids}}; - {finished, Child} -> - queue_index_walker({[], sets:del_element(Child, Kids)}) - end + {ok, Pid} = gatherer:start_link(), + queue_index_walker({DurableQueues, Pid}); + +queue_index_walker({[], Gatherer}) -> + case gatherer:fetch(Gatherer) of + finished -> finished; + {value, {MsgId, Count}} -> {MsgId, Count, {[], Gatherer}} end; -queue_index_walker({[QueueName | QueueNames], Kids}) -> +queue_index_walker({[QueueName | QueueNames], Gatherer}) -> Child = make_ref(), + ok = gatherer:wait_on(Gatherer, Child), ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader, - [QueueName, self(), Child]}), - queue_index_walker({QueueNames, sets:add_element(Child, Kids)}). + [QueueName, Gatherer, Child]}), + queue_index_walker({QueueNames, Gatherer}). -queue_index_walker_reader(QueueName, Parent, Guid) -> +queue_index_walker_reader(QueueName, Gatherer, Guid) -> State = blank_state(QueueName), State1 = load_journal(State), SegNums = all_segment_nums(State1), - queue_index_walker_reader(Parent, Guid, State1, SegNums). + queue_index_walker_reader(Gatherer, Guid, State1, SegNums). -queue_index_walker_reader(Parent, Guid, State, []) -> +queue_index_walker_reader(Gatherer, Guid, State, []) -> _State = terminate(false, State), - Parent ! {finished, Guid}; -queue_index_walker_reader(Parent, Guid, State, [Seg | SegNums]) -> + ok = gatherer:finished(Gatherer, Guid); +queue_index_walker_reader(Gatherer, Guid, State, [Seg | SegNums]) -> SeqId = reconstruct_seq_id(Seg, 0), {Messages, State1} = read_segment_entries(SeqId, State), - queue_index_walker_reader(Parent, Guid, SegNums, State1, Messages). + State2 = queue_index_walker_reader1(Gatherer, State1, Messages), + queue_index_walker_reader(Gatherer, Guid, State2, SegNums). -queue_index_walker_reader(Parent, Guid, SegNums, State, []) -> - queue_index_walker_reader(Parent, Guid, State, SegNums); -queue_index_walker_reader( - Parent, Guid, SegNums, State, - [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) -> +queue_index_walker_reader1(_Gatherer, State, []) -> + State; +queue_index_walker_reader1( + Gatherer, State, [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) -> case IsPersistent of - true -> Parent ! {found, MsgId, 1}; + true -> gatherer:produce(Gatherer, {MsgId, 1}); false -> ok end, - queue_index_walker_reader(Parent, Guid, SegNums, State, Msgs). + queue_index_walker_reader1(Gatherer, State, Msgs). %%---------------------------------------------------------------------------- %% Minors |
