summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-07 14:49:54 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-07 14:49:54 +0100
commit87ab69f95a5e1ad5c84bcb7dbed84c798f1cb3c4 (patch)
tree767b1d1bcae44b5edb8669ca11f798935196fe59 /src
parent4cc02485583ba5104de6b34eee8c6f4ab83f2eaf (diff)
parent81fa43bd8070a377c5bb0efce9f8bc63864f6b8e (diff)
downloadrabbitmq-server-git-87ab69f95a5e1ad5c84bcb7dbed84c798f1cb3c4.tar.gz
Merging default into bug 22500
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_control.erl82
-rw-r--r--src/rabbit_misc.erl2
-rw-r--r--src/rabbit_multi.erl13
-rw-r--r--src/worker_pool.erl137
-rw-r--r--src/worker_pool_sup.erl69
-rw-r--r--src/worker_pool_worker.erl94
7 files changed, 323 insertions, 96 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 700acede24..b120499739 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -51,27 +51,39 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
- {enables, kernel_ready}]}).
+ {enables, external_infrastructure}]}).
+
+-rabbit_boot_step({worker_pool,
+ [{description, "worker pool"},
+ {mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {enables, external_infrastructure}]}).
+
+-rabbit_boot_step({external_infrastructure,
+ [{description, "external infrastructure ready"}]}).
-rabbit_boot_step({rabbit_exchange_type_registry,
[{description, "exchange type registry"},
{mfa, {rabbit_sup, start_child,
[rabbit_exchange_type_registry]}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_log]}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({kernel_ready,
- [{description, "kernel ready"}]}).
+ [{description, "kernel ready"},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_alarm,
[{description, "alarm handler"},
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6aac442888..d1834b3b73 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -46,6 +46,7 @@
-spec(stop/0 :: () -> 'ok').
-spec(action/4 :: (atom(), erlang_node(), [string()],
fun ((string(), [any()]) -> 'ok')) -> 'ok').
+-spec(usage/0 :: () -> no_return()).
-endif.
@@ -130,86 +131,7 @@ stop() ->
ok.
usage() ->
- io:format("Usage: rabbitmqctl [-q] [-n <node>] <command> [<arg> ...]
-
-Available commands:
-
- stop - stops the RabbitMQ application and halts the node
- stop_app - stops the RabbitMQ application, leaving the node running
- start_app - starts the RabbitMQ application on an already-running node
- reset - resets node to default configuration, deleting all data
- force_reset
- cluster <ClusterNode> ...
- status
- rotate_logs [Suffix]
- close_connection <ConnectionPid> <ExplanationString>
-
- add_user <UserName> <Password>
- delete_user <UserName>
- change_password <UserName> <NewPassword>
- list_users
-
- add_vhost <VHostPath>
- delete_vhost <VHostPath>
- list_vhosts
-
- set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
- clear_permissions [-p <VHostPath>] <UserName>
- list_permissions [-p <VHostPath>]
- list_user_permissions <UserName>
-
- list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
- list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
- list_bindings [-p <VHostPath>]
- list_connections [<ConnectionInfoItem> ...]
- list_channels [<ChannelInfoItem> ...]
- list_consumers [-p <VHostPath>]
-
-Quiet output mode is selected with the \"-q\" flag. Informational
-messages are suppressed when quiet mode is in effect.
-
-<node> should be the name of the master node of the RabbitMQ
-cluster. It defaults to the node named \"rabbit\" on the local
-host. On a host named \"server.example.com\", the master node will
-usually be rabbit@server (unless RABBITMQ_NODENAME has been set to
-some non-default value at broker startup time). The output of hostname
--s is usually the correct suffix to use after the \"@\" sign.
-
-The list_queues, list_exchanges and list_bindings commands accept an
-optional virtual host parameter for which to display results. The
-default value is \"/\".
-
-<QueueInfoItem> must be a member of the list [name, durable,
-auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid,
-exclusive_consumer_tag, messages_ready, messages_unacknowledged,
-messages_uncommitted, messages, acks_uncommitted, consumers,
-transactions, memory]. The default is to display name and (number of)
-messages.
-
-<ExchangeInfoItem> must be a member of the list [name, type, durable,
-auto_delete, arguments]. The default is to display name and type.
-
-The output format for \"list_bindings\" is a list of rows containing
-exchange name, queue name, routing key and arguments, in that order.
-
-<ConnectionInfoItem> must be a member of the list [pid, address, port,
-peer_address, peer_port, state, channels, user, vhost, timeout,
-frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
-send_pend]. The default is to display user, peer_address, peer_port
-and state.
-
-<ChannelInfoItem> must be a member of the list [pid, connection,
-number, user, vhost, transactional, consumer_count,
-messages_unacknowledged, acks_uncommitted, prefetch_count]. The
-default is to display pid, user, transactional, consumer_count,
-messages_unacknowledged.
-
-The output format for \"list_consumers\" is a list of rows containing,
-in order, the queue name, channel process id, consumer tag, and a
-boolean indicating whether acknowledgements are expected from the
-consumer.
-
-"),
+ io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
action(stop, Node, [], Inform) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9abc1695ae..81cecb38f3 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case mnesia:sync_transaction(TxFun) of
+ case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
end.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 7c56ae3dda..336f74bf9a 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -42,6 +42,7 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
+-spec(usage/0 :: () -> no_return()).
-endif.
@@ -86,16 +87,8 @@ stop() ->
ok.
usage() ->
- io:format("Usage: rabbitmq-multi <command>
-
-Available commands:
-
- start_all <NodeCount> - start a local cluster of RabbitMQ nodes.
- status - print status of all running nodes
- stop_all - stops all local RabbitMQ nodes.
- rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes.
-"),
- halt(3).
+ io:format("~s", [rabbit_multi_usage:usage()]),
+ halt(1).
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
new file mode 100644
index 0000000000..1ee958afb6
--- /dev/null
+++ b/src/worker_pool.erl
@@ -0,0 +1,137 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool).
+
+%% Generic worker pool manager.
+%%
+%% Supports nested submission of jobs (nested jobs always run
+%% immediately in current worker process).
+%%
+%% Possible future enhancements:
+%%
+%% 1. Allow priorities (basically, change the pending queue to a
+%% priority_queue).
+%%
+%% 2. Allow the submission to the pool_worker to be async.
+
+-behaviour(gen_server2).
+
+-export([start_link/0, submit/1, idle/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-record(state, { available, pending }).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
+
+submit(Fun) ->
+ case get(worker_pool_worker) of
+ true -> worker_pool_worker:run(Fun);
+ _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ worker_pool_worker:submit(Pid, Fun)
+ end.
+
+idle(WId) ->
+ gen_server2:cast(?SERVER, {idle, WId}).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(next_free, From, State = #state { available = Avail,
+ pending = Pending }) ->
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ {noreply, State #state { pending = queue:in(From, Pending) },
+ hibernate};
+ {{value, WId}, Avail1} ->
+ {reply, get_worker_pid(WId), State #state { available = Avail1 },
+ hibernate}
+ end;
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast({idle, WId}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply, case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = queue:in(WId, Avail) };
+ {{value, From}, Pending1} ->
+ gen_server2:reply(From, get_worker_pid(WId)),
+ State #state { pending = Pending1 }
+ end, hibernate};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+get_worker_pid(WId) ->
+ [{WId, Pid, _Type, _Modules} | _] =
+ lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
+ when Id =:= WId -> false;
+ (_) -> true
+ end,
+ supervisor:which_children(worker_pool_sup)),
+ Pid.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
new file mode 100644
index 0000000000..4ded63a8db
--- /dev/null
+++ b/src/worker_pool_sup.erl
@@ -0,0 +1,69 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, start_link/1]).
+
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ start_link(erlang:system_info(schedulers)).
+
+start_link(WCount) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]).
+
+%%----------------------------------------------------------------------------
+
+init([WCount]) ->
+ {ok, {{one_for_one, 10, 10},
+ [{worker_pool, {worker_pool, start_link, []}, transient,
+ 16#ffffffff, worker, [worker_pool]} |
+ [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff,
+ worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
new file mode 100644
index 0000000000..3bfcc2d9ff
--- /dev/null
+++ b/src/worker_pool_worker.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool_worker).
+
+-behaviour(gen_server2).
+
+-export([start_link/1, submit/2, run/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link(WId) ->
+ gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+
+submit(Pid, Fun) ->
+ gen_server2:call(Pid, {submit, Fun}, infinity).
+
+init([WId]) ->
+ ok = worker_pool:idle(WId),
+ put(worker_pool_worker, true),
+ {ok, WId, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({submit, Fun}, From, WId) ->
+ gen_server2:reply(From, run(Fun)),
+ ok = worker_pool:idle(WId),
+ {noreply, WId, hibernate};
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+run({M, F, A}) ->
+ apply(M, F, A);
+run(Fun) ->
+ Fun().