summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-10 17:41:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-10 17:41:29 +0100
commit9c358627c4529a60c2c48b9af8b6443cc5948527 (patch)
treee4de9baf8bd63a15549972d894215a988356cbb2
parent792d203882a9911a83c1baa396fec71d8f730670 (diff)
parent4d019b5fc64e7643b76d691fdc084633a5bd4b58 (diff)
downloadrabbitmq-server-git-9c358627c4529a60c2c48b9af8b6443cc5948527.tar.gz
merge in from default. All tests seem to pass.
-rw-r--r--.hgignore5
-rw-r--r--Makefile8
-rw-r--r--ebin/rabbit.rel7
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rwxr-xr-xscripts/activate-plugins47
-rw-r--r--scripts/activate-plugins.bat53
-rwxr-xr-xscripts/rabbitmq-server13
-rwxr-xr-xscripts/rabbitmq-server.bat17
-rw-r--r--src/gen_server2.erl209
-rw-r--r--src/rabbit.erl17
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_basic.erl25
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_misc.erl25
-rw-r--r--src/rabbit_plugin_activator.erl198
-rw-r--r--src/rabbit_writer.erl36
18 files changed, 530 insertions, 144 deletions
diff --git a/.hgignore b/.hgignore
index 35607765b3..3323e2fc59 100644
--- a/.hgignore
+++ b/.hgignore
@@ -10,6 +10,11 @@ syntax: regexp
^src/rabbit_framing.erl$
^rabbit.plt$
^ebin/rabbit.app$
+^ebin/rabbit.rel$
+^ebin/rabbit.boot$
+^ebin/rabbit.script$
+^plugins/
+^priv/plugins/
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
diff --git a/Makefile b/Makefile
index fb1853ba9f..e8ac727648 100644
--- a/Makefile
+++ b/Makefile
@@ -66,13 +66,13 @@ $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN
dialyze: $(BEAM_TARGETS)
dialyzer -c $?
-clean: cleandb
+clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
-cleandb: stop-node
+cleandb:
rm -rf $(RABBITMQ_MNESIA_DIR)/*
############ various tasks to interact with RabbitMQ ###################
@@ -98,7 +98,7 @@ run-node: all
run-tests: all
echo "rabbit_tests:all_tests()." | $(ERL_CALL)
-start-background-node: stop-node
+start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
@@ -134,7 +134,7 @@ srcdist: distclean
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
>> $(TARGET_SRC_DIR)/BUILD
- sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in
+ sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile generate_app $(TARGET_SRC_DIR)
diff --git a/ebin/rabbit.rel b/ebin/rabbit.rel
deleted file mode 100644
index c2d2067b9c..0000000000
--- a/ebin/rabbit.rel
+++ /dev/null
@@ -1,7 +0,0 @@
-{release, {"rabbit", "1.1.0-alpha"}, {erts, "1.14.2"},
- [{rabbit, "1.1.0-alpha"},
- {mnesia, "4.3.4"},
- {os_mon, "2.1.2"},
- {sasl, "2.1.5"},
- {stdlib, "1.14.4"},
- {kernel, "2.11.4"}]}.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 8e1c890eb2..0057ea0478 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -1,7 +1,7 @@
{application, rabbit, %% -*- erlang -*-
[{description, "RabbitMQ"},
{id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
+ {vsn, "%%VSN%%"},
{modules, []},
{registered, [rabbit_amqqueue_sup,
rabbit_log,
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 9e7c4bfb69..eb953b81fa 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -34,11 +34,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
-
-# The rabbitmq build needs escript, which is missing from /usr/bin in
-# some versions of the erlang RPM. See
-# <https://bugzilla.redhat.com/show_bug.cgi?id=481302>
-PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags}
+make %{?_smp_mflags}
%install
rm -rf %{buildroot}
diff --git a/scripts/activate-plugins b/scripts/activate-plugins
new file mode 100755
index 0000000000..52f7ddbe61
--- /dev/null
+++ b/scripts/activate-plugins
@@ -0,0 +1,47 @@
+#!/bin/sh
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+
+RABBITMQ_EBIN=`dirname $0`/../ebin
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="`dirname $0`/../plugins"
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="`dirname $0`/../priv/plugins"
+
+exec erl \
+ -pa "$RABBITMQ_EBIN" \
+ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
+ -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
+ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \
+ -noinput \
+ -hidden \
+ -s rabbit_plugin_activator \
+ -extra "$@"
diff --git a/scripts/activate-plugins.bat b/scripts/activate-plugins.bat
new file mode 100644
index 0000000000..8bef4ad266
--- /dev/null
+++ b/scripts/activate-plugins.bat
@@ -0,0 +1,53 @@
+@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+if "%ERLANG_HOME%"=="" (
+ set ERLANG_HOME=%~dp0%..\..\..
+)
+
+if not exist "%ERLANG_HOME%\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+)
+
+set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
+set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 70e0c66bcd..2883df9da3 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -75,16 +75,25 @@ fi
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit'
+RABBITMQ_EBIN_ROOT="`dirname $0`/../ebin"
+if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then
+ RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit"
+ RABBITMQ_EBIN_PATH=""
+else
+ RABBITMQ_BOOT_FILE=start_sasl
+ RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+fi
+
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and
# there is no other way of preventing their expansion.
set -f
exec erl \
- -pa "`dirname $0`/../ebin" \
+ ${RABBITMQ_EBIN_PATH} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot start_sasl \
+ -boot ${RABBITMQ_BOOT_FILE} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
-rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 22dc10c605..3b6e493880 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -84,10 +84,10 @@ set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+ type %LOGS% >> %LOGS_BACKUP%
)
if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+ type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
)
rem End of log management
@@ -104,11 +104,20 @@ set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%"
if "%RABBITMQ_MNESIA_DIR%"=="" (
set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
)
+set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
+if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
+ echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
+ set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_EBIN_PATH=
+) else (
+ set RABBITMQ_BOOT_FILE=start_sasl
+ set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+)
"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%~dp0..\ebin" ^
+%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot start_sasl ^
+-boot %RABBITMQ_BOOT_FILE% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 529ed0295e..36fb4fa8c3 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -21,34 +21,41 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
-%% 5) init can return a 4th arg, {backoff, InitialTimeout,
+%% 5) The callback module can optionally implement
+%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
+%% called immediately prior to and post hibernation, respectively. If
+%% handle_pre_hibernate returns {hibernate, NewState} then the process
+%% will hibernate. If the module does not implement
+%% handle_pre_hibernate/1 then the default action is to hibernate.
+%%
+%% 6) init can return a 4th arg, {backoff, InitialTimeout,
%% MinimumTimeout, DesiredHibernatePeriod} (all in
%% milliseconds). Then, on all callbacks which can return a timeout
%% (including init), timeout can be 'hibernate'. When this is the
%% case, the current timeout value will be used (initially, the
%% InitialTimeout supplied from init). After this timeout has
-%% occurred, handle_pre_hibernate/1 will be called. If that returns
-%% {hibernate, State} then the process will be hibernated. Upon
-%% awaking, a new current timeout value will be calculated, and then
-%% handle_post_hibernate/1 will be called. The purpose is that the
-%% gen_server2 takes care of adjusting the current timeout value such
-%% that the process will increase the timeout value repeatedly if it
-%% is unable to sleep for the DesiredHibernatePeriod. If it is able to
-%% sleep for the DesiredHibernatePeriod it will decrease the current
-%% timeout down to the MinimumTimeout, so that the process is put to
-%% sleep sooner (and hopefully for longer). In short, should a process
+%% occurred, hibernation will occur as normal. Upon awaking, a new
+%% current timeout value will be calculated.
+%%
+%% The purpose is that the gen_server2 takes care of adjusting the
+%% current timeout value such that the process will increase the
+%% timeout value repeatedly if it is unable to sleep for the
+%% DesiredHibernatePeriod. If it is able to sleep for the
+%% DesiredHibernatePeriod it will decrease the current timeout down to
+%% the MinimumTimeout, so that the process is put to sleep sooner (and
+%% hopefully stays asleep for longer). In short, should a process
%% using this receive a burst of messages, it should not hibernate
%% between those messages, but as the messages become less frequent,
%% the process will not only hibernate, it will do so sooner after
%% each message.
%%
-%% Normal timeout values (i.e. not 'hibernate') can still be used, and
-%% if they are used then the handle_info(timeout, State) will be
-%% called as normal. In this case, returning 'hibernate' from
-%% handle_info(timeout, State) will not hibernate the process
-%% immediately, as it would if backoff wasn't being used. Instead
-%% it'll wait for the current timeout as described above, before
-%% calling handle_pre_hibernate(State).
+%% When using this backoff mechanism, normal timeout values (i.e. not
+%% 'hibernate') can still be used, and if they are used then the
+%% handle_info(timeout, State) will be called as normal. In this case,
+%% returning 'hibernate' from handle_info(timeout, State) will not
+%% hibernate the process immediately, as it would if backoff wasn't
+%% being used. Instead it'll wait for the current timeout as described
+%% above.
%% All modifications are (C) 2009 LShift Ltd.
@@ -157,7 +164,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7, wake_hib/8]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
-export([behaviour_info/1]).
@@ -345,8 +352,7 @@ enter_loop(Mod, Options, State) ->
enter_loop(Mod, Options, State, self(), infinity, undefined).
enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
- Backoff1 = extend_backoff(Mod, Backoff),
- enter_loop(Mod, Options, State, self(), infinity, Backoff1);
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
enter_loop(Mod, Options, State, ServerName, infinity, undefined);
@@ -355,8 +361,7 @@ enter_loop(Mod, Options, State, Timeout) ->
enter_loop(Mod, Options, State, self(), Timeout, undefined).
enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
- Backoff1 = extend_backoff(Mod, Backoff),
- enter_loop(Mod, Options, State, ServerName, infinity, Backoff1);
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
@@ -366,7 +371,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug).
+ Backoff1 = extend_backoff(Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -393,8 +399,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
{ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ Backoff1 = extend_backoff(Backoff),
proc_lib:init_ack(Starter, {ok, self()}),
- Backoff1 = extend_backoff(Mod, Backoff),
loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
@@ -433,11 +439,10 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
-extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
- Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1),
- Post = erlang:function_exported(Mod, handle_post_hibernate, 1),
- random:seed(now()), %% call before we get into the loop
- {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}.
+extend_backoff(undefined) ->
+ undefined;
+extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
+ {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
%%%========================================================================
%%% Internal functions
@@ -446,8 +451,7 @@ extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
%%% The MAIN loop.
%%% ---------------------------------------------------
loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,
- [Parent, Name, State, Mod, undefined, Queue, Debug]);
+ pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
drain(Queue), Debug).
@@ -466,29 +470,29 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
- {hibernate,
- {backoff, Current, _Min, _Desired, _Pre, _Post}} ->
+ {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
{Current, true};
{hibernate, _} ->
%% wake_hib/7 will set Time to hibernate. If
%% we were woken and didn't receive a msg
%% then we will get here and need a sensible
%% value for Time1, otherwise we crash.
- %% On the grounds that it's better to get
- %% control back to the user module sooner
- %% rather than later, 0 is more sensible
- %% than infinity here.
- {0, false};
+ %% R13B1 always waits infinitely when waking
+ %% from hibernation, so that's what we do
+ %% here too.
+ {infinity, false};
_ -> {Time, false}
end,
receive
Input ->
- loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue1), Debug)
+ %% Time could be 'hibernate' here, so *don't* call loop
+ process_next_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ drain(in(Input, Queue1)), Debug)
after Time1 ->
case HibOnTimeout of
true ->
- backoff_pre_hibernate(
+ pre_hibernate(
Parent, Name, State, Mod, TimeoutState, Queue1,
Debug);
false ->
@@ -499,43 +503,66 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
end
end.
-wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
- drain(Queue), Debug).
-
-wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
- backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(),
- TimeoutState, drain(Queue), Debug).
-
-backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState =
- {backoff, _Current, _Minimum, _Desired, Pre, _Post},
- Queue, Debug) ->
- case Pre of
+wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+ TimeoutState1 = case TS of
+ undefined ->
+ undefined;
+ {SleptAt, TimeoutState} ->
+ adjust_timeout_state(SleptAt, now(), TimeoutState)
+ end,
+ post_hibernate(Parent, Name, State, Mod, TimeoutState1,
+ drain(Queue), Debug).
+
+hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ TS = case TimeoutState of
+ undefined -> undefined;
+ {backoff, _, _, _, _} -> {now(), TimeoutState}
+ end,
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ TS, Queue, Debug]).
+
+pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
true ->
case catch Mod:handle_pre_hibernate(State) of
{hibernate, NState} ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState,
- Mod, now(),
- TimeoutState, Queue,
- Debug]);
- {stop, Reason, NState} ->
- terminate(Reason, Name, pre_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, pre_hibernate, Mod, State, []);
+ hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
+ Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, pre_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ end.
+
+post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_post_hibernate, 1) of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, infinity,
+ TimeoutState, Queue, Debug);
+ {noreply, NState, Time} ->
+ process_next_msg(Parent, Name, NState, Mod, Time,
+ TimeoutState, Queue, Debug);
Reply ->
- terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
- State, [])
+ handle_common_termination(Reply, Name, post_hibernate,
+ Mod, State, Debug)
end;
false ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
- now(), TimeoutState, Queue,
- Debug])
+ %% use hibernate here, not infinity. This matches
+ %% R13B. The key is that we should be able to get through
+ %% to process_msg calling sys:handle_system_msg with Time
+ %% still set to hibernate, iff that msg is the very msg
+ %% that woke us up (or the first msg we receive after
+ %% waking up).
+ process_next_msg(Parent, Name, State, Mod, hibernate,
+ TimeoutState, Queue, Debug)
end.
-backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- {backoff, CurrentTO, MinimumTO, DesiredHibPeriod,
- Pre, Post},
- Queue, Debug) ->
+adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
+ DesiredHibPeriod, RandomState}) ->
NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
CurrentMicros = CurrentTO * 1000,
MinimumMicros = MinimumTO * 1000,
@@ -548,29 +575,9 @@ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
true -> lists:max([MinimumTO, CurrentTO div 2]);
false -> CurrentTO
end,
- CurrentTO1 = Base + random:uniform(Base),
- TimeoutState =
- {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post},
- case Post of
- true ->
- case catch Mod:handle_post_hibernate(State) of
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState,
- Queue, Debug);
- {noreply, NState, Time} ->
- loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue,
- Debug);
- {stop, Reason, NState} ->
- terminate(Reason, Name, post_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, post_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, post_hibernate,
- Mod, State, [])
- end;
- false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue,
- Debug)
- end.
+ {Extra, RandomState1} = random:uniform_s(Base, RandomState),
+ CurrentTO1 = Base + Extra,
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
@@ -862,12 +869,8 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {stop, Reason, NState} ->
- terminate(Reason, Name, Msg, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, Msg, Mod, State, []);
- _ ->
- terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, [])
end.
handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
@@ -882,6 +885,12 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
+ end.
+
+handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
+ case Reply of
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 9587238835..f1dcd51f0d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -213,8 +213,21 @@ log_location(Type) ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n",
- [Product, Version,
+ ProductLen = string:len(Product),
+ io:format("~n"
+ "+---+ +---+~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| +---+ +-------+~n"
+ "| |~n"
+ "| ~s +---+ |~n"
+ "| | | |~n"
+ "| ~s +---+ |~n"
+ "| |~n"
+ "+-------------------+~n"
+ "AMQP ~p-~p~n~s~n~s~n~n",
+ [Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 15c5e90738..50ad102376 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -52,8 +52,6 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(CALL_TIMEOUT, 5000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index f70d6067b3..8adb608fdb 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -35,6 +35,7 @@
-export([publish/1, message/4, message/5, message/6, delivery/4]).
-export([properties/1, publish/4, publish/7]).
+-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -57,6 +58,8 @@
-spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
maybe(txn()), properties_input(), binary()) ->
publish_result()).
+-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
+-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
-endif.
@@ -76,6 +79,21 @@ delivery(Mandatory, Immediate, Txn, Message) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message}.
+build_content(Properties, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ #content{class_id = ClassId,
+ properties = Properties,
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]}.
+
+from_content(Content) ->
+ #content{class_id = ClassId,
+ properties = Props,
+ payload_fragments_rev = FragmentsRev} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ {Props, list_to_binary(lists:reverse(FragmentsRev))}.
+
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()).
@@ -84,14 +102,9 @@ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) ->
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) ->
Properties = properties(RawProperties),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
- Content = #content{class_id = ClassId,
- properties = Properties,
- properties_bin = none,
- payload_fragments_rev = [BodyBin]},
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
- content = Content,
+ content = build_content(Properties, BodyBin),
guid = MsgId,
is_persistent = IsPersistent}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index adf2462dac..3866f0a82c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -89,7 +89,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- gen_server2:cast(Pid, {conserve_memory, Conserve}).
+ gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6985956460..79578000d2 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -36,7 +36,7 @@
-record(params, {quiet, node, command, args}).
--define(RPC_TIMEOUT, 30000).
+-define(RPC_TIMEOUT, infinity).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index fc30834e3a..c328c111f8 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -41,6 +41,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
+-export([enable_cover/1, report_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -90,6 +91,8 @@
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
+-spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}).
+-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -193,17 +196,27 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
[Kind, Name, VHostPath])).
enable_cover() ->
- case cover:compile_beam_directory("ebin") of
+ enable_cover(".").
+
+enable_cover([Root]) when is_atom(Root) ->
+ enable_cover(atom_to_list(Root));
+enable_cover(Root) ->
+ case cover:compile_beam_directory(filename:join(Root, "ebin")) of
{error,Reason} -> {error,Reason};
_ -> ok
end.
report_cover() ->
- Dir = "cover/",
- ok = filelib:ensure_dir(Dir),
+ report_cover(".").
+
+report_cover([Root]) when is_atom(Root) ->
+ report_cover(atom_to_list(Root));
+report_cover(Root) ->
+ Dir = filename:join(Root, "cover"),
+ ok = filelib:ensure_dir(filename:join(Dir,"junk")),
lists:foreach(fun(F) -> file:delete(F) end,
- filelib:wildcard(Dir ++ "*.html")),
- {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]),
+ filelib:wildcard(filename:join(Dir, "*.html"))),
+ {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
{CT, NCT} =
lists:foldl(
fun(M,{CovTot, NotCovTot}) ->
@@ -212,7 +225,7 @@ report_cover() ->
Cov, NotCov, M),
{ok,_} = cover:analyze_to_file(
M,
- Dir ++ atom_to_list(M) ++ ".html",
+ filename:join(Dir, atom_to_list(M) ++ ".html"),
[html]),
{CovTot+Cov, NotCovTot+NotCov}
end,
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
new file mode 100644
index 0000000000..71278bfb2a
--- /dev/null
+++ b/src/rabbit_plugin_activator.erl
@@ -0,0 +1,198 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_plugin_activator).
+
+-export([start/0, stop/0]).
+
+-define(DefaultPluginDir, "plugins").
+-define(DefaultUnpackedPluginDir, "priv/plugins").
+-define(DefaultRabbitEBin, "ebin").
+-define(BaseApps, [rabbit]).
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ %% Ensure Rabbit is loaded so we can access it's environment
+ application:load(rabbit),
+
+ %% Determine our various directories
+ PluginDir = get_env(plugins_dir, ?DefaultPluginDir),
+ UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
+ RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
+
+ %% Unpack any .ez plugins
+ unpack_ez_plugins(PluginDir, UnpackedPluginDir),
+
+ %% Build a list of required apps based on the fixed set, and any plugins
+ RequiredApps = ?BaseApps ++
+ find_plugins(PluginDir) ++
+ find_plugins(UnpackedPluginDir),
+
+ %% Build the entire set of dependencies - this will load the
+ %% applications along the way
+ AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
+ {unknown_app, {App, Err}} ->
+ io:format("ERROR: Failed to load application " ++
+ "~s: ~p~n", [App, Err]),
+ halt(1);
+ AppList ->
+ AppList
+ end,
+ AppVersions = [determine_version(App) || App <- AllApps],
+ {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions),
+
+ %% Build the overall release descriptor
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
+ {erts, erlang:system_info(version)},
+ AppVersions},
+
+ %% Write it out to ebin/rabbit.rel
+ file:write_file(RabbitEBin ++ "/rabbit.rel",
+ io_lib:format("~p.~n", [RDesc])),
+
+ %% Compile the script
+ case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of
+ {ok, Module, Warnings} ->
+ %% This gets lots of spurious no-source warnings when we
+ %% have .ez files, so we want to supress them to prevent
+ %% hiding real issues.
+ WarningStr = Module:format_warning(
+ [W || W <- Warnings,
+ case W of
+ {warning, {source_not_found, _}} -> false;
+ _ -> true
+ end]),
+ case length(WarningStr) of
+ 0 -> ok;
+ _ -> io:format("~s", [WarningStr])
+ end,
+ ok;
+ {error, Module, Error} ->
+ io:format("Boot file generation failed: ~s~n",
+ [Module:format_error(Error)]),
+ halt(1)
+ end,
+ halt(),
+ ok.
+
+stop() ->
+ ok.
+
+get_env(Key, Default) ->
+ case application:get_env(rabbit, Key) of
+ {ok, V} -> V;
+ _ -> Default
+ end.
+
+determine_version(App) ->
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
+ {App, Vsn}.
+
+assert_dir(Dir) ->
+ case filelib:is_dir(Dir) of
+ true -> ok;
+ false ->
+ ok = filelib:ensure_dir(Dir),
+ ok = file:make_dir(Dir)
+ end.
+delete_dir(Dir) ->
+ case filelib:is_dir(Dir) of
+ true ->
+ case file:list_dir(Dir) of
+ {ok, Files} ->
+ [case Dir ++ "/" ++ F of
+ Fn ->
+ case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
+ true -> delete_dir(Fn);
+ false -> file:delete(Fn)
+ end
+ end || F <- Files]
+ end,
+ ok = file:del_dir(Dir);
+ false ->
+ ok
+ end.
+is_symlink(Name) ->
+ case file:read_link(Name) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
+unpack_ez_plugins(PluginSrcDir, PluginDestDir) ->
+ %% Eliminate the contents of the destination directory
+ delete_dir(PluginDestDir),
+
+ assert_dir(PluginDestDir),
+ [unpack_ez_plugin(PluginName, PluginDestDir) ||
+ PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")].
+
+unpack_ez_plugin(PluginFn, PluginDestDir) ->
+ zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
+ ok.
+
+find_plugins(PluginDir) ->
+ [prepare_dir_plugin(PluginName) ||
+ PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
+
+prepare_dir_plugin(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN),
+
+ %% We want the second-last token
+ NameTokens = string:tokens(PluginAppDescFn,"/."),
+ PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
+ list_to_atom(PluginNameString).
+
+expand_dependencies(Pending) ->
+ expand_dependencies(sets:new(), Pending).
+expand_dependencies(Current, []) ->
+ Current;
+expand_dependencies(Current, [Next|Rest]) ->
+ case sets:is_element(Next, Current) of
+ true ->
+ expand_dependencies(Current, Rest);
+ false ->
+ case application:load(Next) of
+ ok ->
+ ok;
+ {error, {already_loaded, _}} ->
+ ok;
+ X ->
+ throw({unknown_app, {Next, X}})
+ end,
+ {ok, Required} = application:get_key(Next, applications),
+ Unique = [A || A <- Required, not(sets:is_element(A, Current))],
+ expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
+ end.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 9cf9f8aef9..e338ddfe9d 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3,
- send_command_and_notify/5]).
+-export([start/3, start_link/3, shutdown/1, mainloop/1]).
+-export([send_command/2, send_command/3, send_command_and_signal_back/3,
+ send_command_and_signal_back/4, send_command_and_notify/5]).
-export([internal_send_command/3, internal_send_command/5]).
-import(gen_tcp).
@@ -49,8 +49,12 @@
-ifdef(use_specs).
-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
+-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
+-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
+-spec(send_command_and_signal_back/4 ::
+ (pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
-spec(internal_send_command/3 ::
@@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) ->
channel = Channel,
frame_max = FrameMax}]).
+start_link(Sock, Channel, FrameMax) ->
+ spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}]).
+
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
@@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content},
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax),
State;
+handle_message({send_command_and_signal_back, MethodRecord, Parent},
+ State = #wstate{sock = Sock, channel = Channel}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ Parent ! rabbit_writer_send_command_signal,
+ State;
+handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
+ State = #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord,
+ Content, FrameMax),
+ Parent ! rabbit_writer_send_command_signal,
+ State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
@@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
+send_command_and_signal_back(W, MethodRecord, Parent) ->
+ W ! {send_command_and_signal_back, MethodRecord, Parent},
+ ok.
+
+send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
+ W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
+ ok.
+
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.