diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-07 14:49:54 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-07 14:49:54 +0100 |
| commit | 87ab69f95a5e1ad5c84bcb7dbed84c798f1cb3c4 (patch) | |
| tree | 767b1d1bcae44b5edb8669ca11f798935196fe59 /src | |
| parent | 4cc02485583ba5104de6b34eee8c6f4ab83f2eaf (diff) | |
| parent | 81fa43bd8070a377c5bb0efce9f8bc63864f6b8e (diff) | |
| download | rabbitmq-server-git-87ab69f95a5e1ad5c84bcb7dbed84c798f1cb3c4.tar.gz | |
Merging default into bug 22500
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 82 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 13 | ||||
| -rw-r--r-- | src/worker_pool.erl | 137 | ||||
| -rw-r--r-- | src/worker_pool_sup.erl | 69 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 94 |
7 files changed, 323 insertions, 96 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede24..b120499739 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,27 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). -rabbit_boot_step({rabbit_exchange_type_registry, [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6aac442888..d1834b3b73 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -46,6 +46,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(action/4 :: (atom(), erlang_node(), [string()], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -130,86 +131,7 @@ stop() -> ok. usage() -> - io:format("Usage: rabbitmqctl [-q] [-n <node>] <command> [<arg> ...] - -Available commands: - - stop - stops the RabbitMQ application and halts the node - stop_app - stops the RabbitMQ application, leaving the node running - start_app - starts the RabbitMQ application on an already-running node - reset - resets node to default configuration, deleting all data - force_reset - cluster <ClusterNode> ... - status - rotate_logs [Suffix] - close_connection <ConnectionPid> <ExplanationString> - - add_user <UserName> <Password> - delete_user <UserName> - change_password <UserName> <NewPassword> - list_users - - add_vhost <VHostPath> - delete_vhost <VHostPath> - list_vhosts - - set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> - clear_permissions [-p <VHostPath>] <UserName> - list_permissions [-p <VHostPath>] - list_user_permissions <UserName> - - list_queues [-p <VHostPath>] [<QueueInfoItem> ...] - list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] - list_bindings [-p <VHostPath>] - list_connections [<ConnectionInfoItem> ...] - list_channels [<ChannelInfoItem> ...] - list_consumers [-p <VHostPath>] - -Quiet output mode is selected with the \"-q\" flag. Informational -messages are suppressed when quiet mode is in effect. - -<node> should be the name of the master node of the RabbitMQ -cluster. It defaults to the node named \"rabbit\" on the local -host. On a host named \"server.example.com\", the master node will -usually be rabbit@server (unless RABBITMQ_NODENAME has been set to -some non-default value at broker startup time). The output of hostname --s is usually the correct suffix to use after the \"@\" sign. - -The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results. The -default value is \"/\". - -<QueueInfoItem> must be a member of the list [name, durable, -auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid, -exclusive_consumer_tag, messages_ready, messages_unacknowledged, -messages_uncommitted, messages, acks_uncommitted, consumers, -transactions, memory]. The default is to display name and (number of) -messages. - -<ExchangeInfoItem> must be a member of the list [name, type, durable, -auto_delete, arguments]. The default is to display name and type. - -The output format for \"list_bindings\" is a list of rows containing -exchange name, queue name, routing key and arguments, in that order. - -<ConnectionInfoItem> must be a member of the list [pid, address, port, -peer_address, peer_port, state, channels, user, vhost, timeout, -frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, -send_pend]. The default is to display user, peer_address, peer_port -and state. - -<ChannelInfoItem> must be a member of the list [pid, connection, -number, user, vhost, transactional, consumer_count, -messages_unacknowledged, acks_uncommitted, prefetch_count]. The -default is to display pid, user, transactional, consumer_count, -messages_unacknowledged. - -The output format for \"list_consumers\" is a list of rows containing, -in order, the queue name, channel process id, consumer tag, and a -boolean indicating whether acknowledgements are expected from the -consumer. - -"), + io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). action(stop, Node, [], Inform) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9abc1695ae..81cecb38f3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7c56ae3dda..336f74bf9a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -42,6 +42,7 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -86,16 +87,8 @@ stop() -> ok. usage() -> - io:format("Usage: rabbitmq-multi <command> - -Available commands: - - start_all <NodeCount> - start a local cluster of RabbitMQ nodes. - status - print status of all running nodes - stop_all - stops all local RabbitMQ nodes. - rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes. -"), - halt(3). + io:format("~s", [rabbit_multi_usage:usage()]), + halt(1). action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 0000000000..1ee958afb6 --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,137 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). +%% +%% 2. Allow the submission to the pool_worker to be async. + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, State #state { pending = queue:in(From, Pending) }, + hibernate}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }, + hibernate} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, From}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 } + end, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 0000000000..4ded63a8db --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 0000000000..3bfcc2d9ff --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |
