summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gatherer.erl142
-rw-r--r--src/rabbit_msg_store.erl75
-rw-r--r--src/rabbit_queue_index.erl51
3 files changed, 213 insertions, 55 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
new file mode 100644
index 0000000000..8c44388c40
--- /dev/null
+++ b/src/gatherer.erl
@@ -0,0 +1,142 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(gatherer).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, wait_on/2, produce/2, finished/2, fetch/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(wait_on/2 :: (pid(), any()) -> 'ok').
+-spec(produce/2 :: (pid(), any()) -> 'ok').
+-spec(finished/2 :: (pid(), any()) -> 'ok').
+-spec(fetch/1 :: (pid()) -> {'value', any()} | 'finished').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+-record(gstate, { waiting_on, results, blocking }).
+
+%%----------------------------------------------------------------------------
+
+wait_on(Pid, Token) ->
+ gen_server2:call(Pid, {wait_on, Token}, infinity).
+
+produce(Pid, Result) ->
+ gen_server2:cast(Pid, {produce, Result}).
+
+finished(Pid, Token) ->
+ gen_server2:call(Pid, {finished, Token}, infinity).
+
+fetch(Pid) ->
+ gen_server2:call(Pid, fetch, infinity).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
+
+init([]) ->
+ {ok, #gstate { waiting_on = sets:new(), results = queue:new(),
+ blocking = queue:new() }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({wait_on, Token}, _From, State = #gstate { waiting_on = Tokens }) ->
+ {reply, ok, State #gstate { waiting_on = sets:add_element(Token, Tokens) },
+ hibernate};
+
+handle_call({finished, Token}, _From,
+ State = #gstate { waiting_on = Tokens, results = Results,
+ blocking = Blocking }) ->
+ Tokens1 = sets:del_element(Token, Tokens),
+ State1 = State #gstate { waiting_on = Tokens1 },
+ case 0 =:= sets:size(Tokens1) andalso queue:is_empty(Results) andalso
+ not queue:is_empty(Blocking) of
+ true -> {stop, normal, ok, State1};
+ false -> {reply, ok, State1, hibernate}
+ end;
+
+handle_call(fetch, From, State =
+ #gstate { blocking = Blocking, results = Results,
+ waiting_on = Tokens }) ->
+ case queue:out(Results) of
+ {empty, _Results} ->
+ case sets:size(Tokens) of
+ 0 -> {stop, normal, finished, State};
+ _ -> {noreply,
+ State #gstate { blocking = queue:in(From, Blocking) },
+ hibernate}
+ end;
+ {{value, Result}, Results1} ->
+ {reply, {value, Result}, State #gstate { results = Results1 },
+ hibernate}
+ end;
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast({produce, Result}, State = #gstate { blocking = Blocking,
+ results = Results }) ->
+ {noreply, case queue:out(Blocking) of
+ {empty, _Blocking} ->
+ State #gstate { results = queue:in(Result, Results) };
+ {{value, Blocked}, Blocking1} ->
+ gen_server2:reply(Blocked, {value, Result}),
+ State #gstate { blocking = Blocking1 }
+ end, hibernate};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State = #gstate { blocking = Blocking } ) ->
+ [gen_server2:reply(Blocked, finished)
+ || Blocked <- queue:to_list(Blocking) ],
+ State.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 5610b35e34..82a9ddd71d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,7 +36,8 @@
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
sync/3, client_init/1, client_terminate/1, clean/2]).
--export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal
+-export([sync/1, gc_done/4, set_maximum_since_use/2,
+ build_index_worker/6]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
@@ -467,8 +468,6 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
%%----------------------------------------------------------------------------
init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
- process_flag(trap_exit, true),
-
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
@@ -527,6 +526,8 @@ init([Server, BaseDir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
+ process_flag(trap_exit, true),
+
{ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
FileSummaryEts),
@@ -1182,23 +1183,44 @@ find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
-build_index([], State) ->
- build_index(undefined, [State #msstate.current_file], State);
build_index(Files, State) ->
- {Offset, State1} = build_index(undefined, Files, State),
- {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}.
-
-build_index(Left, [], State = #msstate { file_summary_ets = FileSummaryEts }) ->
- ok = index_delete_by_file(undefined, State),
- Offset = case ets:lookup(FileSummaryEts, Left) of
- [] -> 0;
- [#file_summary { file_size = FileSize }] -> FileSize
- end,
- {Offset, State #msstate { current_file = Left }};
-build_index(Left, [File|Files],
- State = #msstate { dir = Dir, sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- file_summary_ets = FileSummaryEts }) ->
+ {ok, Pid} = gatherer:start_link(),
+ case Files of
+ [] -> build_index(Pid, undefined, [State #msstate.current_file], State);
+ _ -> {Offset, State1} = build_index(Pid, undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}
+ end.
+
+build_index(Gatherer, Left, [],
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }) ->
+ case gatherer:fetch(Gatherer) of
+ finished ->
+ ok = index_delete_by_file(undefined, State),
+ Offset = case ets:lookup(FileSummaryEts, Left) of
+ [] -> 0;
+ [#file_summary { file_size = FileSize }] -> FileSize
+ end,
+ {Offset, State #msstate { current_file = Left }};
+ {value, FileSummary =
+ #file_summary { valid_total_size = ValidTotalSize,
+ file_size = FileSize }} ->
+ true = ets:insert_new(FileSummaryEts, FileSummary),
+ build_index(Gatherer, Left, [],
+ State #msstate {
+ sum_valid_data = SumValid + ValidTotalSize,
+ sum_file_size = SumFileSize + FileSize })
+ end;
+build_index(Gatherer, Left, [File|Files], State) ->
+ Child = make_ref(),
+ ok = gatherer:wait_on(Gatherer, Child),
+ ok = worker_pool:submit_async({?MODULE, build_index_worker,
+ [Gatherer, Child, State, Left, File, Files]}),
+ build_index(Gatherer, File, Files, State).
+
+build_index_worker(
+ Gatherer, Guid, State = #msstate { dir = Dir }, Left, File, Files) ->
{ok, Messages, FileSize} =
rabbit_msg_store_misc:scan_file_for_valid_messages(
Dir, rabbit_msg_store_misc:filenum_to_name(File)),
@@ -1231,15 +1253,12 @@ build_index(Left, [File|Files],
end};
[F|_] -> {F, FileSize}
end,
- true =
- ets:insert_new(FileSummaryEts, #file_summary {
- file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop, locked = false,
- left = Left, right = Right, file_size = FileSize1,
- readers = 0 }),
- build_index(File, Files,
- State #msstate { sum_valid_data = SumValid + ValidTotalSize,
- sum_file_size = SumFileSize + FileSize1 }).
+ ok = gatherer:produce(Gatherer, #file_summary {
+ file = File, valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop, locked = false,
+ left = Left, right = Right, file_size = FileSize1,
+ readers = 0 }),
+ ok = gatherer:finished(Gatherer, Guid).
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 935f275451..2a94adf77f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -422,48 +422,45 @@ start_persistent_msg_store(DurableQueues) ->
%%----------------------------------------------------------------------------
queue_index_walker(DurableQueues) when is_list(DurableQueues) ->
- queue_index_walker({DurableQueues, sets:new()});
-
-queue_index_walker({[], Kids}) ->
- case sets:size(Kids) of
- 0 -> finished;
- _ -> receive
- {found, MsgId, Count} ->
- {MsgId, Count, {[], Kids}};
- {finished, Child} ->
- queue_index_walker({[], sets:del_element(Child, Kids)})
- end
+ {ok, Pid} = gatherer:start_link(),
+ queue_index_walker({DurableQueues, Pid});
+
+queue_index_walker({[], Gatherer}) ->
+ case gatherer:fetch(Gatherer) of
+ finished -> finished;
+ {value, {MsgId, Count}} -> {MsgId, Count, {[], Gatherer}}
end;
-queue_index_walker({[QueueName | QueueNames], Kids}) ->
+queue_index_walker({[QueueName | QueueNames], Gatherer}) ->
Child = make_ref(),
+ ok = gatherer:wait_on(Gatherer, Child),
ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader,
- [QueueName, self(), Child]}),
- queue_index_walker({QueueNames, sets:add_element(Child, Kids)}).
+ [QueueName, Gatherer, Child]}),
+ queue_index_walker({QueueNames, Gatherer}).
-queue_index_walker_reader(QueueName, Parent, Guid) ->
+queue_index_walker_reader(QueueName, Gatherer, Guid) ->
State = blank_state(QueueName),
State1 = load_journal(State),
SegNums = all_segment_nums(State1),
- queue_index_walker_reader(Parent, Guid, State1, SegNums).
+ queue_index_walker_reader(Gatherer, Guid, State1, SegNums).
-queue_index_walker_reader(Parent, Guid, State, []) ->
+queue_index_walker_reader(Gatherer, Guid, State, []) ->
_State = terminate(false, State),
- Parent ! {finished, Guid};
-queue_index_walker_reader(Parent, Guid, State, [Seg | SegNums]) ->
+ ok = gatherer:finished(Gatherer, Guid);
+queue_index_walker_reader(Gatherer, Guid, State, [Seg | SegNums]) ->
SeqId = reconstruct_seq_id(Seg, 0),
{Messages, State1} = read_segment_entries(SeqId, State),
- queue_index_walker_reader(Parent, Guid, SegNums, State1, Messages).
+ State2 = queue_index_walker_reader1(Gatherer, State1, Messages),
+ queue_index_walker_reader(Gatherer, Guid, State2, SegNums).
-queue_index_walker_reader(Parent, Guid, SegNums, State, []) ->
- queue_index_walker_reader(Parent, Guid, State, SegNums);
-queue_index_walker_reader(
- Parent, Guid, SegNums, State,
- [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
+queue_index_walker_reader1(_Gatherer, State, []) ->
+ State;
+queue_index_walker_reader1(
+ Gatherer, State, [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
case IsPersistent of
- true -> Parent ! {found, MsgId, 1};
+ true -> gatherer:produce(Gatherer, {MsgId, 1});
false -> ok
end,
- queue_index_walker_reader(Parent, Guid, SegNums, State, Msgs).
+ queue_index_walker_reader1(Gatherer, State, Msgs).
%%----------------------------------------------------------------------------
%% Minors