diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-01 12:00:26 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-01 12:00:26 +0100 |
| commit | 7c691d118046c391dd03da7400aac4b371aa7b97 (patch) | |
| tree | dba61b671d8e3aff34779c8ccec2cbd114ca1c63 | |
| parent | cd1f96394e75496569201206a2db2ad80a9a1f83 (diff) | |
| parent | 11783f7e7dd9e0a1382826093680c338664e4c57 (diff) | |
| download | rabbitmq-server-git-7c691d118046c391dd03da7400aac4b371aa7b97.tar.gz | |
Merging in from default
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | docs/examples-to-end.xsl | 13 | ||||
| -rw-r--r-- | docs/rabbitmq.conf.5.xml | 12 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
| -rw-r--r-- | docs/usage.xsl | 10 | ||||
| -rw-r--r-- | src/rabbit.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 3 | ||||
| -rw-r--r-- | src/worker_pool.erl | 135 | ||||
| -rw-r--r-- | src/worker_pool_sup.erl | 69 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 94 |
12 files changed, 345 insertions, 24 deletions
@@ -202,7 +202,7 @@ distclean: clean # xmlto can not read from standard input, so we mess with a tmp file. %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) $<.tmp && \ + xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index b63ffcb3c9..496fcc1c34 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -8,7 +8,7 @@ <xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> -<!-- Don't copy exmaples through in place --> +<!-- Don't copy examples through in place --> <xsl:template match="*[@role='example-prefix']"/> <xsl:template match="*[@role='example']"/> @@ -25,6 +25,7 @@ </xsl:for-each> <refsect1> <title>Examples</title> +<xsl:if test="//screen[@role='example']"> <variablelist> <xsl:for-each select="//screen[@role='example']"> <varlistentry> @@ -35,6 +36,16 @@ </varlistentry> </xsl:for-each> </variablelist> +</xsl:if> +<!-- +We need to handle multiline examples separately, since not using a +variablelist leads to slightly less nice formatting (the explanation doesn't get +indented) +--> +<xsl:for-each select="//screen[@role='example-multiline']"> +<screen><emphasis role="bold"><xsl:copy-of select="text()"/></emphasis></screen> +<xsl:copy-of select="following-sibling::para[@role='example']"/> +</xsl:for-each> </refsect1> </refentry> </xsl:template> diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml index dcb1e49c68..34f20f9226 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq.conf.5.xml @@ -9,7 +9,7 @@ </refentryinfo> <refmeta> - <refentrytitle>/etc/rabbitmq/rabbitmq.conf</refentrytitle> + <refentrytitle>rabbitmq.conf</refentrytitle> <manvolnum>5</manvolnum> <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> </refmeta> @@ -59,11 +59,11 @@ environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: <filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. </para> <para role="example-prefix">For example:</para> - <screen role="example"> - # I am a complete /etc/rabbitmq/rabbitmq.conf file. - # Comment lines start with a hash character. - # This is a /bin/sh script file - use ordinary envt var syntax - NODENAME=hare + <screen role="example-multiline"> +# I am a complete /etc/rabbitmq/rabbitmq.conf file. +# Comment lines start with a hash character. +# This is a /bin/sh script file - use ordinary envt var syntax +NODENAME=hare </screen> <para role="example"> This is an example of a complete diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index e7fc45e4ba..7634b2d247 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -65,7 +65,7 @@ <title>Options</title> <variablelist> <varlistentry> - <term><option>-n</option> <replaceable>node</replaceable></term> + <term><cmdsynopsis><arg choice="opt">-n <replaceable>node</replaceable></arg></cmdsynopsis></term> <listitem> <para role="usage"> Default node is "rabbit@server", where server is the local host. On @@ -79,7 +79,7 @@ </listitem> </varlistentry> <varlistentry> - <term><option>-q</option></term> + <term><cmdsynopsis><arg choice="opt">-q</arg></cmdsynopsis></term> <listitem> <para role="usage"> Quiet output mode is selected with the "-q" flag. Informational diff --git a/docs/usage.xsl b/docs/usage.xsl index 841b2a8461..72f8880ab1 100644 --- a/docs/usage.xsl +++ b/docs/usage.xsl @@ -12,19 +12,17 @@ encoding="UTF-8" indent="no"/> <xsl:strip-space elements="*"/> -<xsl:preserve-space elements="term" /> +<xsl:preserve-space elements="cmdsynopsis arg" /> <xsl:template match="/"> <!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit! -module(<xsl:value-of select="$modulename" />). -export([usage/0]). -usage() -> io:format(%QUOTE%Usage: +usage() -> %QUOTE%Usage: <xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> <xsl:text> </xsl:text> <xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg"> - <xsl:if test="@choice='opt'">[</xsl:if> <xsl:apply-templates select="." /> - <xsl:if test="@choice='opt'">]</xsl:if> <xsl:text> </xsl:text> </xsl:for-each> @@ -60,7 +58,7 @@ usage() -> io:format(%QUOTE%Usage: </xsl:for-each> <xsl:apply-templates select=".//*[title='Commands']/refsect2" mode="command-usage" /> -%QUOTE%), halt(1). +%QUOTE%. </xsl:template> <!-- Option lists in command usage --> @@ -74,7 +72,7 @@ usage() -> io:format(%QUOTE%Usage: <!-- Don't show anything else in command usage --> <xsl:template match="text()" mode="command-usage"/> -<xsl:template match="option">[<xsl:apply-templates/>]</xsl:template> +<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template> <xsl:template match="replaceable"><<xsl:value-of select="."/>></xsl:template> </xsl:stylesheet> diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede24..b120499739 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,27 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). -rabbit_boot_step({rabbit_exchange_type_registry, [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index f2f291692a..d1834b3b73 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -131,7 +131,8 @@ stop() -> ok. usage() -> - rabbit_ctl_usage:usage(). + io:format("~s", [rabbit_ctl_usage:usage()]), + halt(1). action(stop, Node, [], Inform) -> Inform("Stopping and halting node ~p", [Node]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9abc1695ae..81cecb38f3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 9ff2c5cb11..336f74bf9a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -87,7 +87,8 @@ stop() -> ok. usage() -> - rabbit_multi_usage:usage(). + io:format("~s", [rabbit_multi_usage:usage()]), + halt(1). action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 0000000000..b883d4f0cb --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,135 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). +%% +%% 2. Allow the submission to the pool_worker to be async. + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, State #state { pending = queue:in(From, Pending) }}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, From}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 } + end}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 0000000000..4ded63a8db --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 0000000000..fc3ce3714f --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |
