diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-10 17:41:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-10 17:41:29 +0100 |
| commit | 9c358627c4529a60c2c48b9af8b6443cc5948527 (patch) | |
| tree | e4de9baf8bd63a15549972d894215a988356cbb2 | |
| parent | 792d203882a9911a83c1baa396fec71d8f730670 (diff) | |
| parent | 4d019b5fc64e7643b76d691fdc084633a5bd4b58 (diff) | |
| download | rabbitmq-server-git-9c358627c4529a60c2c48b9af8b6443cc5948527.tar.gz | |
merge in from default. All tests seem to pass.
| -rw-r--r-- | .hgignore | 5 | ||||
| -rw-r--r-- | Makefile | 8 | ||||
| -rw-r--r-- | ebin/rabbit.rel | 7 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
| -rwxr-xr-x | scripts/activate-plugins | 47 | ||||
| -rw-r--r-- | scripts/activate-plugins.bat | 53 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 13 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server.bat | 17 | ||||
| -rw-r--r-- | src/gen_server2.erl | 209 | ||||
| -rw-r--r-- | src/rabbit.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 198 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 36 |
18 files changed, 530 insertions, 144 deletions
@@ -10,6 +10,11 @@ syntax: regexp ^src/rabbit_framing.erl$ ^rabbit.plt$ ^ebin/rabbit.app$ +^ebin/rabbit.rel$ +^ebin/rabbit.boot$ +^ebin/rabbit.script$ +^plugins/ +^priv/plugins/ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ @@ -66,13 +66,13 @@ $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN dialyze: $(BEAM_TARGETS) dialyzer -c $? -clean: cleandb +clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz -cleandb: stop-node +cleandb: rm -rf $(RABBITMQ_MNESIA_DIR)/* ############ various tasks to interact with RabbitMQ ################### @@ -98,7 +98,7 @@ run-node: all run-tests: all echo "rabbit_tests:all_tests()." | $(ERL_CALL) -start-background-node: stop-node +start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ @@ -134,7 +134,7 @@ srcdist: distclean cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/BUILD - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in + sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) diff --git a/ebin/rabbit.rel b/ebin/rabbit.rel deleted file mode 100644 index c2d2067b9c..0000000000 --- a/ebin/rabbit.rel +++ /dev/null @@ -1,7 +0,0 @@ -{release, {"rabbit", "1.1.0-alpha"}, {erts, "1.14.2"}, - [{rabbit, "1.1.0-alpha"}, - {mnesia, "4.3.4"}, - {os_mon, "2.1.2"}, - {sasl, "2.1.5"}, - {stdlib, "1.14.4"}, - {kernel, "2.11.4"}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 8e1c890eb2..0057ea0478 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -1,7 +1,7 @@ {application, rabbit, %% -*- erlang -*- [{description, "RabbitMQ"}, {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, + {vsn, "%%VSN%%"}, {modules, []}, {registered, [rabbit_amqqueue_sup, rabbit_log, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 9e7c4bfb69..eb953b81fa 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -34,11 +34,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} - -# The rabbitmq build needs escript, which is missing from /usr/bin in -# some versions of the erlang RPM. See -# <https://bugzilla.redhat.com/show_bug.cgi?id=481302> -PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags} +make %{?_smp_mflags} %install rm -rf %{buildroot} diff --git a/scripts/activate-plugins b/scripts/activate-plugins new file mode 100755 index 0000000000..52f7ddbe61 --- /dev/null +++ b/scripts/activate-plugins @@ -0,0 +1,47 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf + +RABBITMQ_EBIN=`dirname $0`/../ebin +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="`dirname $0`/../plugins" +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="`dirname $0`/../priv/plugins" + +exec erl \ + -pa "$RABBITMQ_EBIN" \ + -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ + -noinput \ + -hidden \ + -s rabbit_plugin_activator \ + -extra "$@" diff --git a/scripts/activate-plugins.bat b/scripts/activate-plugins.bat new file mode 100644 index 0000000000..8bef4ad266 --- /dev/null +++ b/scripts/activate-plugins.bat @@ -0,0 +1,53 @@ +@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+if "%ERLANG_HOME%"=="" (
+ set ERLANG_HOME=%~dp0%..\..\..
+)
+
+if not exist "%ERLANG_HOME%\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+)
+
+set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
+set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 70e0c66bcd..2883df9da3 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -75,16 +75,25 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' +RABBITMQ_EBIN_ROOT="`dirname $0`/../ebin" +if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then + RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" + RABBITMQ_EBIN_PATH="" +else + RABBITMQ_BOOT_FILE=start_sasl + RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}" +fi + # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and # there is no other way of preventing their expansion. set -f exec erl \ - -pa "`dirname $0`/../ebin" \ + ${RABBITMQ_EBIN_PATH} \ ${RABBITMQ_START_RABBIT} \ -sname ${RABBITMQ_NODENAME} \ - -boot start_sasl \ + -boot ${RABBITMQ_BOOT_FILE} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 22dc10c605..3b6e493880 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -84,10 +84,10 @@ set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%" set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+ type %LOGS% >> %LOGS_BACKUP%
)
if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+ type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
)
rem End of log management
@@ -104,11 +104,20 @@ set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%" if "%RABBITMQ_MNESIA_DIR%"=="" (
set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
)
+set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
+if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
+ echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
+ set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_EBIN_PATH=
+) else (
+ set RABBITMQ_BOOT_FILE=start_sasl
+ set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+)
"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%~dp0..\ebin" ^
+%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot start_sasl ^
+-boot %RABBITMQ_BOOT_FILE% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 529ed0295e..36fb4fa8c3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -21,34 +21,41 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% -%% 5) init can return a 4th arg, {backoff, InitialTimeout, +%% 5) The callback module can optionally implement +%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be +%% called immediately prior to and post hibernation, respectively. If +%% handle_pre_hibernate returns {hibernate, NewState} then the process +%% will hibernate. If the module does not implement +%% handle_pre_hibernate/1 then the default action is to hibernate. +%% +%% 6) init can return a 4th arg, {backoff, InitialTimeout, %% MinimumTimeout, DesiredHibernatePeriod} (all in %% milliseconds). Then, on all callbacks which can return a timeout %% (including init), timeout can be 'hibernate'. When this is the %% case, the current timeout value will be used (initially, the %% InitialTimeout supplied from init). After this timeout has -%% occurred, handle_pre_hibernate/1 will be called. If that returns -%% {hibernate, State} then the process will be hibernated. Upon -%% awaking, a new current timeout value will be calculated, and then -%% handle_post_hibernate/1 will be called. The purpose is that the -%% gen_server2 takes care of adjusting the current timeout value such -%% that the process will increase the timeout value repeatedly if it -%% is unable to sleep for the DesiredHibernatePeriod. If it is able to -%% sleep for the DesiredHibernatePeriod it will decrease the current -%% timeout down to the MinimumTimeout, so that the process is put to -%% sleep sooner (and hopefully for longer). In short, should a process +%% occurred, hibernation will occur as normal. Upon awaking, a new +%% current timeout value will be calculated. +%% +%% The purpose is that the gen_server2 takes care of adjusting the +%% current timeout value such that the process will increase the +%% timeout value repeatedly if it is unable to sleep for the +%% DesiredHibernatePeriod. If it is able to sleep for the +%% DesiredHibernatePeriod it will decrease the current timeout down to +%% the MinimumTimeout, so that the process is put to sleep sooner (and +%% hopefully stays asleep for longer). In short, should a process %% using this receive a burst of messages, it should not hibernate %% between those messages, but as the messages become less frequent, %% the process will not only hibernate, it will do so sooner after %% each message. %% -%% Normal timeout values (i.e. not 'hibernate') can still be used, and -%% if they are used then the handle_info(timeout, State) will be -%% called as normal. In this case, returning 'hibernate' from -%% handle_info(timeout, State) will not hibernate the process -%% immediately, as it would if backoff wasn't being used. Instead -%% it'll wait for the current timeout as described above, before -%% calling handle_pre_hibernate(State). +%% When using this backoff mechanism, normal timeout values (i.e. not +%% 'hibernate') can still be used, and if they are used then the +%% handle_info(timeout, State) will be called as normal. In this case, +%% returning 'hibernate' from handle_info(timeout, State) will not +%% hibernate the process immediately, as it would if backoff wasn't +%% being used. Instead it'll wait for the current timeout as described +%% above. %% All modifications are (C) 2009 LShift Ltd. @@ -157,7 +164,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7, wake_hib/8]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -345,8 +352,7 @@ enter_loop(Mod, Options, State) -> enter_loop(Mod, Options, State, self(), infinity, undefined). enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> - Backoff1 = extend_backoff(Mod, Backoff), - enter_loop(Mod, Options, State, self(), infinity, Backoff1); + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> enter_loop(Mod, Options, State, ServerName, infinity, undefined); @@ -355,8 +361,7 @@ enter_loop(Mod, Options, State, Timeout) -> enter_loop(Mod, Options, State, self(), Timeout, undefined). enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> - Backoff1 = extend_backoff(Mod, Backoff), - enter_loop(Mod, Options, State, ServerName, infinity, Backoff1); + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> enter_loop(Mod, Options, State, ServerName, Timeout, undefined). @@ -366,7 +371,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug). + Backoff1 = extend_backoff(Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -393,8 +399,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> proc_lib:init_ack(Starter, {ok, self()}), loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + Backoff1 = extend_backoff(Backoff), proc_lib:init_ack(Starter, {ok, self()}), - Backoff1 = extend_backoff(Mod, Backoff), loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the @@ -433,11 +439,10 @@ unregister_name({global,Name}) -> unregister_name(Pid) when is_pid(Pid) -> Pid. -extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) -> - Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1), - Post = erlang:function_exported(Mod, handle_post_hibernate, 1), - random:seed(now()), %% call before we get into the loop - {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}. +extend_backoff(undefined) -> + undefined; +extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. %%%======================================================================== %%% Internal functions @@ -446,8 +451,7 @@ extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) -> %%% The MAIN loop. %%% --------------------------------------------------- loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - proc_lib:hibernate(?MODULE,wake_hib, - [Parent, Name, State, Mod, undefined, Queue, Debug]); + pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, drain(Queue), Debug). @@ -466,29 +470,29 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of - {hibernate, - {backoff, Current, _Min, _Desired, _Pre, _Post}} -> + {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> {Current, true}; {hibernate, _} -> %% wake_hib/7 will set Time to hibernate. If %% we were woken and didn't receive a msg %% then we will get here and need a sensible %% value for Time1, otherwise we crash. - %% On the grounds that it's better to get - %% control back to the user module sooner - %% rather than later, 0 is more sensible - %% than infinity here. - {0, false}; + %% R13B1 always waits infinitely when waking + %% from hibernation, so that's what we do + %% here too. + {infinity, false}; _ -> {Time, false} end, receive Input -> - loop(Parent, Name, State, Mod, - Time, TimeoutState, in(Input, Queue1), Debug) + %% Time could be 'hibernate' here, so *don't* call loop + process_next_msg( + Parent, Name, State, Mod, Time, TimeoutState, + drain(in(Input, Queue1)), Debug) after Time1 -> case HibOnTimeout of true -> - backoff_pre_hibernate( + pre_hibernate( Parent, Name, State, Mod, TimeoutState, Queue1, Debug); false -> @@ -499,43 +503,66 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> end end. -wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState, - drain(Queue), Debug). - -wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) -> - backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(), - TimeoutState, drain(Queue), Debug). - -backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState = - {backoff, _Current, _Minimum, _Desired, Pre, _Post}, - Queue, Debug) -> - case Pre of +wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> + TimeoutState1 = case TS of + undefined -> + undefined; + {SleptAt, TimeoutState} -> + adjust_timeout_state(SleptAt, now(), TimeoutState) + end, + post_hibernate(Parent, Name, State, Mod, TimeoutState1, + drain(Queue), Debug). + +hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + TS = case TimeoutState of + undefined -> undefined; + {backoff, _, _, _, _} -> {now(), TimeoutState} + end, + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + TS, Queue, Debug]). + +pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_pre_hibernate, 1) of true -> case catch Mod:handle_pre_hibernate(State) of {hibernate, NState} -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, - Mod, now(), - TimeoutState, Queue, - Debug]); - {stop, Reason, NState} -> - terminate(Reason, Name, pre_hibernate, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, pre_hibernate, Mod, State, []); + hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, + Debug); + Reply -> + handle_common_termination(Reply, Name, pre_hibernate, + Mod, State, Debug) + end; + false -> + hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + end. + +post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_post_hibernate, 1) of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + process_next_msg(Parent, Name, NState, Mod, infinity, + TimeoutState, Queue, Debug); + {noreply, NState, Time} -> + process_next_msg(Parent, Name, NState, Mod, Time, + TimeoutState, Queue, Debug); Reply -> - terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod, - State, []) + handle_common_termination(Reply, Name, post_hibernate, + Mod, State, Debug) end; false -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, - now(), TimeoutState, Queue, - Debug]) + %% use hibernate here, not infinity. This matches + %% R13B. The key is that we should be able to get through + %% to process_msg calling sys:handle_system_msg with Time + %% still set to hibernate, iff that msg is the very msg + %% that woke us up (or the first msg we receive after + %% waking up). + process_next_msg(Parent, Name, State, Mod, hibernate, + TimeoutState, Queue, Debug) end. -backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, - {backoff, CurrentTO, MinimumTO, DesiredHibPeriod, - Pre, Post}, - Queue, Debug) -> +adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, + DesiredHibPeriod, RandomState}) -> NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), CurrentMicros = CurrentTO * 1000, MinimumMicros = MinimumTO * 1000, @@ -548,29 +575,9 @@ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, true -> lists:max([MinimumTO, CurrentTO div 2]); false -> CurrentTO end, - CurrentTO1 = Base + random:uniform(Base), - TimeoutState = - {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post}, - case Post of - true -> - case catch Mod:handle_post_hibernate(State) of - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, - Queue, Debug); - {noreply, NState, Time} -> - loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue, - Debug); - {stop, Reason, NState} -> - terminate(Reason, Name, post_hibernate, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, post_hibernate, Mod, State, []); - Reply -> - terminate({bad_return_value, Reply}, Name, post_hibernate, - Mod, State, []) - end; - false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue, - Debug) - end. + {Extra, RandomState1} = random:uniform_s(Base, RandomState), + CurrentTO1 = Base + Extra, + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); @@ -862,12 +869,8 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, []); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, []) end. handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, @@ -882,6 +885,12 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, Debug) + end. + +handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> + case Reply of {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> diff --git a/src/rabbit.erl b/src/rabbit.erl index 9587238835..f1dcd51f0d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -213,8 +213,21 @@ log_location(Type) -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n", - [Product, Version, + ProductLen = string:len(Product), + io:format("~n" + "+---+ +---+~n" + "| | | |~n" + "| | | |~n" + "| | | |~n" + "| +---+ +-------+~n" + "| |~n" + "| ~s +---+ |~n" + "| | | |~n" + "| ~s +---+ |~n" + "| |~n" + "+-------------------+~n" + "AMQP ~p-~p~n~s~n~s~n~n", + [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 15c5e90738..50ad102376 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -52,8 +52,6 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(CALL_TIMEOUT, 5000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index f70d6067b3..8adb608fdb 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -35,6 +35,7 @@ -export([publish/1, message/4, message/5, message/6, delivery/4]). -export([properties/1, publish/4, publish/7]). +-export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -57,6 +58,8 @@ -spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), maybe(txn()), properties_input(), binary()) -> publish_result()). +-spec(build_content/2 :: (amqp_properties(), binary()) -> content()). +-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). -endif. @@ -76,6 +79,21 @@ delivery(Mandatory, Immediate, Txn, Message) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message}. +build_content(Properties, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + #content{class_id = ClassId, + properties = Properties, + properties_bin = none, + payload_fragments_rev = [BodyBin]}. + +from_content(Content) -> + #content{class_id = ClassId, + properties = Props, + payload_fragments_rev = FragmentsRev} = + rabbit_binary_parser:ensure_content_decoded(Content), + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + {Props, list_to_binary(lists:reverse(FragmentsRev))}. + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()). @@ -84,14 +102,9 @@ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) -> Properties = properties(RawProperties), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = Properties, - properties_bin = none, - payload_fragments_rev = [BodyBin]}, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, - content = Content, + content = build_content(Properties, BodyBin), guid = MsgId, is_persistent = IsPersistent}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index adf2462dac..3866f0a82c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -89,7 +89,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:cast(Pid, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6985956460..79578000d2 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -36,7 +36,7 @@ -record(params, {quiet, node, command, args}). --define(RPC_TIMEOUT, 30000). +-define(RPC_TIMEOUT, infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index fc30834e3a..c328c111f8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -41,6 +41,7 @@ -export([dirty_read/1]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). +-export([enable_cover/1, report_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -90,6 +91,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -193,17 +196,27 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> [Kind, Name, VHostPath])). enable_cover() -> - case cover:compile_beam_directory("ebin") of + enable_cover("."). + +enable_cover([Root]) when is_atom(Root) -> + enable_cover(atom_to_list(Root)); +enable_cover(Root) -> + case cover:compile_beam_directory(filename:join(Root, "ebin")) of {error,Reason} -> {error,Reason}; _ -> ok end. report_cover() -> - Dir = "cover/", - ok = filelib:ensure_dir(Dir), + report_cover("."). + +report_cover([Root]) when is_atom(Root) -> + report_cover(atom_to_list(Root)); +report_cover(Root) -> + Dir = filename:join(Root, "cover"), + ok = filelib:ensure_dir(filename:join(Dir,"junk")), lists:foreach(fun(F) -> file:delete(F) end, - filelib:wildcard(Dir ++ "*.html")), - {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]), + filelib:wildcard(filename:join(Dir, "*.html"))), + {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), {CT, NCT} = lists:foldl( fun(M,{CovTot, NotCovTot}) -> @@ -212,7 +225,7 @@ report_cover() -> Cov, NotCov, M), {ok,_} = cover:analyze_to_file( M, - Dir ++ atom_to_list(M) ++ ".html", + filename:join(Dir, atom_to_list(M) ++ ".html"), [html]), {CovTot+Cov, NotCovTot+NotCov} end, diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl new file mode 100644 index 0000000000..71278bfb2a --- /dev/null +++ b/src/rabbit_plugin_activator.erl @@ -0,0 +1,198 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_plugin_activator). + +-export([start/0, stop/0]). + +-define(DefaultPluginDir, "plugins"). +-define(DefaultUnpackedPluginDir, "priv/plugins"). +-define(DefaultRabbitEBin, "ebin"). +-define(BaseApps, [rabbit]). + +%%---------------------------------------------------------------------------- + +start() -> + %% Ensure Rabbit is loaded so we can access it's environment + application:load(rabbit), + + %% Determine our various directories + PluginDir = get_env(plugins_dir, ?DefaultPluginDir), + UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), + RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + + %% Unpack any .ez plugins + unpack_ez_plugins(PluginDir, UnpackedPluginDir), + + %% Build a list of required apps based on the fixed set, and any plugins + RequiredApps = ?BaseApps ++ + find_plugins(PluginDir) ++ + find_plugins(UnpackedPluginDir), + + %% Build the entire set of dependencies - this will load the + %% applications along the way + AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of + {unknown_app, {App, Err}} -> + io:format("ERROR: Failed to load application " ++ + "~s: ~p~n", [App, Err]), + halt(1); + AppList -> + AppList + end, + AppVersions = [determine_version(App) || App <- AllApps], + {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + + %% Build the overall release descriptor + RDesc = {release, + {"rabbit", RabbitVersion}, + {erts, erlang:system_info(version)}, + AppVersions}, + + %% Write it out to ebin/rabbit.rel + file:write_file(RabbitEBin ++ "/rabbit.rel", + io_lib:format("~p.~n", [RDesc])), + + %% Compile the script + case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + {ok, Module, Warnings} -> + %% This gets lots of spurious no-source warnings when we + %% have .ez files, so we want to supress them to prevent + %% hiding real issues. + WarningStr = Module:format_warning( + [W || W <- Warnings, + case W of + {warning, {source_not_found, _}} -> false; + _ -> true + end]), + case length(WarningStr) of + 0 -> ok; + _ -> io:format("~s", [WarningStr]) + end, + ok; + {error, Module, Error} -> + io:format("Boot file generation failed: ~s~n", + [Module:format_error(Error)]), + halt(1) + end, + halt(), + ok. + +stop() -> + ok. + +get_env(Key, Default) -> + case application:get_env(rabbit, Key) of + {ok, V} -> V; + _ -> Default + end. + +determine_version(App) -> + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), + {App, Vsn}. + +assert_dir(Dir) -> + case filelib:is_dir(Dir) of + true -> ok; + false -> + ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) + end. +delete_dir(Dir) -> + case filelib:is_dir(Dir) of + true -> + case file:list_dir(Dir) of + {ok, Files} -> + [case Dir ++ "/" ++ F of + Fn -> + case filelib:is_dir(Fn) and not(is_symlink(Fn)) of + true -> delete_dir(Fn); + false -> file:delete(Fn) + end + end || F <- Files] + end, + ok = file:del_dir(Dir); + false -> + ok + end. +is_symlink(Name) -> + case file:read_link(Name) of + {ok, _} -> true; + _ -> false + end. + +unpack_ez_plugins(PluginSrcDir, PluginDestDir) -> + %% Eliminate the contents of the destination directory + delete_dir(PluginDestDir), + + assert_dir(PluginDestDir), + [unpack_ez_plugin(PluginName, PluginDestDir) || + PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")]. + +unpack_ez_plugin(PluginFn, PluginDestDir) -> + zip:unzip(PluginFn, [{cwd, PluginDestDir}]), + ok. + +find_plugins(PluginDir) -> + [prepare_dir_plugin(PluginName) || + PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")]. + +prepare_dir_plugin(PluginAppDescFn) -> + %% Add the plugin ebin directory to the load path + PluginEBinDirN = filename:dirname(PluginAppDescFn), + code:add_path(PluginEBinDirN), + + %% We want the second-last token + NameTokens = string:tokens(PluginAppDescFn,"/."), + PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens), + list_to_atom(PluginNameString). + +expand_dependencies(Pending) -> + expand_dependencies(sets:new(), Pending). +expand_dependencies(Current, []) -> + Current; +expand_dependencies(Current, [Next|Rest]) -> + case sets:is_element(Next, Current) of + true -> + expand_dependencies(Current, Rest); + false -> + case application:load(Next) of + ok -> + ok; + {error, {already_loaded, _}} -> + ok; + X -> + throw({unknown_app, {Next, X}}) + end, + {ok, Required} = application:get_key(Next, applications), + Unique = [A || A <- Required, not(sets:is_element(A, Current))], + expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) + end. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 9cf9f8aef9..e338ddfe9d 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, - send_command_and_notify/5]). +-export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([send_command/2, send_command/3, send_command_and_signal_back/3, + send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/3, internal_send_command/5]). -import(gen_tcp). @@ -49,8 +49,12 @@ -ifdef(use_specs). -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). +-spec(send_command_and_signal_back/4 :: + (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). -spec(internal_send_command/3 :: @@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) -> channel = Channel, frame_max = FrameMax}]). +start_link(Sock, Channel, FrameMax) -> + spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}]). + mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) @@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax), State; +handle_message({send_command_and_signal_back, MethodRecord, Parent}, + State = #wstate{sock = Sock, channel = Channel}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord), + Parent ! rabbit_writer_send_command_signal, + State; +handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, + State = #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, + Content, FrameMax), + Parent ! rabbit_writer_send_command_signal, + State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, @@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_and_signal_back(W, MethodRecord, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Parent}, + ok. + +send_command_and_signal_back(W, MethodRecord, Content, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, + ok. + send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. |
