diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-27 18:19:56 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-27 18:19:56 +0100 |
| commit | 14c76021b6b58dbf973cb229cfa00e7ba23130d9 (patch) | |
| tree | 6cc9baee2d801dc6a40ccc7b4d67d3ce4dc835bc | |
| parent | 2a1598e9adcce9aef5631cd772f16ae56d30cc1d (diff) | |
| parent | 27ce44e7d420dba66832183a314ee24d6d5e7c3c (diff) | |
| download | rabbitmq-server-git-14c76021b6b58dbf973cb229cfa00e7ba23130d9.tar.gz | |
merge default into bug20284
The tests fail right now because queue_index sends out more messages
than expected.
49 files changed, 1342 insertions, 913 deletions
@@ -4,7 +4,6 @@ RABBITMQ_NODENAME ?= rabbit RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia RABBITMQ_LOG_BASE ?= $(TMPDIR) -RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch DEPS_FILE=deps.mk SOURCE_DIR=src @@ -147,8 +146,7 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \ RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \ RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \ - RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \ - RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)" + RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ @@ -239,7 +237,7 @@ distclean: clean %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \ + xmlto -o $(DOCS_DIR) $$opt man $<.tmp && \ gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c0d9aedae1..17518ddf68 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -127,6 +127,9 @@ done rm -rf %{buildroot} %changelog +* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1 +- New Upstream Release + * Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 0dccf93879..7ee6001630 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.0.0-1) karmic; urgency=low + + * New Upstream Release + + -- Michael Bridgen <mikeb@rabbitmq.com> Mon, 23 Aug 2010 14:55:39 +0100 + rabbitmq-server (1.8.1-1) lucid; urgency=low * New Upstream Release diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index d52dc774b6..9310752f6f 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -39,7 +39,6 @@ CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia -PLUGINS_EXPAND_DIR=/var/lib/rabbitmq/plugins-scratch SERVER_START_ARGS= . `dirname $0`/rabbitmq-env @@ -70,7 +69,6 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${PLUGINS_EXPAND_DIR}" ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} @@ -91,14 +89,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ -pa "$RABBITMQ_EBIN_ROOT" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \ -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" then - RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" + RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit" RABBITMQ_EBIN_PATH="" else exit 1 diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b1a91f47cd..5bcbc6babd 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -110,24 +110,21 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 95e5eebf86..4b3961d43b 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -180,24 +180,21 @@ if errorlevel 1 ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/delegate.erl b/src/delegate.erl index 3f57953bf7..c8aa3092c7 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -44,7 +44,7 @@ -ifdef(use_specs). --spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok(pid())). +-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 39ef3f85b8..ff303ee28c 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -43,7 +43,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any()) | 'ignore'). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -endif. diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e209ee6be4..f83fa0bc11 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -116,13 +116,13 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and release_on_death. obtain/0 -%% blocks until a file descriptor is available. release_on_death/1 -%% takes a pid and monitors the pid, reducing the count by 1 when the -%% pid dies. Thus the assumption is that obtain/0 is called first, and -%% when that returns, release_on_death/1 is called with the pid who -%% "owns" the file descriptor. This is, for example, used to track the -%% use of file descriptors through network sockets. +%% The server also supports obtain and transfer. obtain/0 blocks until +%% a file descriptor is available. transfer/1 is transfers ownership +%% of a file descriptor between processes. It is non-blocking. +%% +%% The callers of register_callback/3, obtain/0, and the argument of +%% transfer/1 are monitored, reducing the count of handles in use +%% appropriately when the processes terminate. -behaviour(gen_server). @@ -130,17 +130,27 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([release_on_death/1, obtain/0]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). --define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). + +%% Googling around suggests that Windows has a limit somewhere around +%% 16M, eg +%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx +%% however, it turns out that's only available through the win32 +%% API. Via the C Runtime, we have just 512: +%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx +-define(FILE_HANDLES_LIMIT_WINDOWS, 512). -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). +-define(CLIENT_ETS_TABLE, ?MODULE). + %%---------------------------------------------------------------------------- -record(file, @@ -168,13 +178,31 @@ -record(fhc_state, { elders, limit, - count, - obtains, - callbacks, - client_mrefs, + open_count, + open_pending, + obtain_limit, + obtain_count, + obtain_pending, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + +-record(pending, + { kind, + pid, + requested, + from + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -182,8 +210,8 @@ -ifdef(use_specs). -type(ref() :: any()). --type(ok_or_error() :: rabbit_types:ok_or_error(any())). --type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())). +-type(ok_or_error() :: 'ok' | {'error', any()}). +-type(val_or_error(T) :: {'ok', T} | {'error', any()}). -type(position() :: ('bof' | 'eof' | non_neg_integer() | {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})). @@ -210,8 +238,10 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(release_on_death/1 :: (pid()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). +-spec(transfer/1 :: (pid()) -> 'ok'). +-spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_limit/0 :: () -> non_neg_integer()). -endif. @@ -238,9 +268,9 @@ open(Path, Mode, Options) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> {error, writer_exists}; - false -> Ref = make_ref(), - case open1(Path1, Mode1, Options, Ref, bof, new) of - {ok, _Handle} -> + false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options), + case get_or_reopen([{Ref, new}]) of + {ok, [_Handle1]} -> RCount1 = case is_reader(Mode1) of true -> RCount + 1; false -> RCount @@ -251,6 +281,7 @@ open(Path, Mode, Options) -> has_writer = HasWriter1 }), {ok, Ref}; Error -> + erase({Ref, fhc_handle}), Error end end. @@ -301,7 +332,7 @@ append(Ref, Data) -> Size1 = Size + iolist_size(Data), Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, write_buffer_size = Size1 }, - case Limit /= infinity andalso Size1 > Limit of + case Limit =/= infinity andalso Size1 > Limit of true -> {Result, Handle3} = write_buffer(Handle2), {Result, [Handle3]}; false -> {ok, [Handle2]} @@ -375,7 +406,8 @@ copy(Src, Dest, Count) -> {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, - DHandle #handle { offset = DOffset + Count1 }]}; + DHandle #handle { offset = DOffset + Count1, + is_dirty = true }]}; Error -> {Error, [SHandle, DHandle]} end; @@ -420,29 +452,29 @@ set_maximum_since_use(MaximumAge) -> case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> - Age = timer:now_diff(Now, Then), - case Hdl /= closed andalso Age >= MaximumAge of - true -> {Res, Handle1} = soft_close(Handle), - case Res of - ok -> put({Ref, fhc_handle}, Handle1), - false; - _ -> put_handle(Ref, Handle1), - Rep - end; + case Hdl =/= closed andalso + timer:now_diff(Now, Then) >= MaximumAge of + true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; (_KeyValuePair, Rep) -> Rep - end, true, get()) of - true -> age_tree_change(), ok; - false -> ok + end, false, get()) of + false -> age_tree_change(), ok; + true -> ok end. -release_on_death(Pid) when is_pid(Pid) -> - gen_server:cast(?SERVER, {release_on_death, Pid}). - obtain() -> - gen_server:call(?SERVER, obtain, infinity). + gen_server:call(?SERVER, {obtain, self()}, infinity). + +transfer(Pid) -> + gen_server:cast(?SERVER, {transfer, self(), Pid}). + +set_limit(Limit) -> + gen_server:call(?SERVER, {set_limit, Limit}, infinity). + +get_limit() -> + gen_server:call(?SERVER, get_limit, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -459,18 +491,9 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> - ResHandles = lists:foldl( - fun (Ref, {ok, HandlesAcc}) -> - case get_or_reopen(Ref) of - {ok, Handle} -> {ok, [Handle | HandlesAcc]}; - Error -> Error - end; - (_Ref, Error) -> - Error - end, {ok, []}, Refs), - case ResHandles of + case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of {ok, Handles} -> - case Fun(lists:reverse(Handles)) of + case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), Result; @@ -499,36 +522,94 @@ with_flushed_handles(Refs, Fun) -> end end). -get_or_reopen(Ref) -> - case get({Ref, fhc_handle}) of - undefined -> - {error, not_open, Ref}; - #handle { hdl = closed, offset = Offset, - path = Path, mode = Mode, options = Options } -> - open1(Path, Mode, Options, Ref, Offset, reopen); - Handle -> - {ok, Handle} +get_or_reopen(RefNewOrReopens) -> + case partition_handles(RefNewOrReopens) of + {OpenHdls, []} -> + {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; + {OpenHdls, ClosedHdls} -> + Oldest = oldest(get_age_tree(), fun () -> now() end), + case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of + ok -> + case reopen(ClosedHdls) of + {ok, RefHdls} -> sort_handles(RefNewOrReopens, + OpenHdls, RefHdls, []); + Error -> Error + end; + close -> + [soft_close(Ref, Handle) || + {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <- + get(), + Hdl =/= closed], + get_or_reopen(RefNewOrReopens) + end + end. + +reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []). + +reopen([], Tree, RefHdls) -> + put_age_tree(Tree), + {ok, lists:reverse(RefHdls)}; +reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, + path = Path, + mode = Mode, + offset = Offset, + last_used_at = undefined }} | + RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> + case file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of + {ok, Hdl} -> + Now = now(), + {{ok, Offset1}, Handle1} = + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + last_used_at = Now }), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), + [{Ref, Handle2} | RefHdls]); + Error -> + %% NB: none of the handles in ToOpen are in the age tree + Oldest = oldest(Tree, fun () -> undefined end), + [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + put_age_tree(Tree), + Error end. +partition_handles(RefNewOrReopens) -> + lists:foldr( + fun ({Ref, NewOrReopen}, {Open, Closed}) -> + case get({Ref, fhc_handle}) of + #handle { hdl = closed } = Handle -> + {Open, [{Ref, NewOrReopen, Handle} | Closed]}; + #handle {} = Handle -> + {[{Ref, Handle} | Open], Closed} + end + end, {[], []}, RefNewOrReopens). + +sort_handles([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]); +sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). + put_handle(Ref, Handle = #handle { last_used_at = Then }) -> Now = now(), age_tree_update(Then, Now, Ref), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). -with_age_tree(Fun) -> - put(fhc_age_tree, Fun(case get(fhc_age_tree) of - undefined -> gb_trees:empty(); - AgeTree -> AgeTree - end)). +with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())). -age_tree_insert(Now, Ref) -> - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - gen_server:cast(?SERVER, {open, self(), Oldest}), - Tree1 - end). +get_age_tree() -> + case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end. + +put_age_tree(Tree) -> put(fhc_age_tree, Tree). age_tree_update(Then, Now, Ref) -> with_age_tree( @@ -540,13 +621,7 @@ age_tree_delete(Then) -> with_age_tree( fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), - Oldest = case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = gb_trees:smallest(Tree1), - Oldest1 - end, + Oldest = oldest(Tree1, fun () -> undefined end), gen_server:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -562,48 +637,53 @@ age_tree_change() -> Tree end). -open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> - Mode1 = case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end, - case file:open(Path, Mode1) of - {ok, Hdl} -> - WriteBufferSize = - case proplists:get_value(write_buffer, Options, unbuffered) of - unbuffered -> 0; - infinity -> infinity; - N when is_integer(N) -> N - end, - Now = now(), - Handle = #handle { hdl = Hdl, - offset = 0, - trusted_offset = 0, - is_dirty = false, - write_buffer_size = 0, - write_buffer_size_limit = WriteBufferSize, - write_buffer = [], - at_eof = false, - path = Path, - mode = Mode, - options = Options, - is_write = is_writer(Mode), - is_read = is_reader(Mode), - last_used_at = Now }, - {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), - age_tree_insert(Now, Ref), - {ok, Handle2}; - {error, Reason} -> - {error, Reason} +oldest(Tree, DefaultFun) -> + case gb_trees:is_empty(Tree) of + true -> DefaultFun(); + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + Oldest + end. + +new_closed_handle(Path, Mode, Options) -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Ref = make_ref(), + put({Ref, fhc_handle}, #handle { hdl = closed, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = undefined }), + {ok, Ref}. + +soft_close(Ref, Handle) -> + {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + true; + _ -> put_handle(Ref, Handle1), + false end. soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + {ok, #handle { hdl = Hdl, + offset = Offset, + is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of true -> file:sync(Hdl); @@ -611,8 +691,10 @@ soft_close(Handle) -> end, ok = file:close(Hdl), age_tree_delete(Then), - {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, - is_dirty = false }}; + {ok, Handle1 #handle { hdl = closed, + trusted_offset = Offset, + is_dirty = false, + last_used_at = undefined }}; {_Error, _Handle} = Result -> Result end. @@ -701,114 +783,303 @@ init([]) -> _ -> ulimit() end, - error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), - {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new(), - client_mrefs = dict:new(), timer_ref = undefined }}. - -handle_call(obtain, From, State = #fhc_state { count = Count }) -> - State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = - maybe_reduce(State #fhc_state { count = Count + 1 }), - case Limit /= infinity andalso Count1 >= Limit of - true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], - count = Count1 - 1 }}; - false -> {reply, ok, State1} - end. - -handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; - -handle_cast({open, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> + ObtainLimit = obtain_limit(Limit), + error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", + [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), + {ok, #fhc_state { elders = dict:new(), + limit = Limit, + open_count = 0, + open_pending = pending_new(), + obtain_limit = ObtainLimit, + obtain_count = 0, + obtain_pending = pending_new(), + clients = Clients, + timer_ref = undefined }}. + +handle_call({open, Pid, Requested, EldestUnusedSince}, From, + State = #fhc_state { open_count = Count, + open_pending = Pending, + elders = Elders, + clients = Clients }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - {noreply, maybe_reduce( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count + 1 }))}; + Item = #pending { kind = open, + pid = Pid, + requested = Requested, + from = From }, + ok = track_client(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, + case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State1 #fhc_state { + open_pending = pending_in(Item, Pending) })}; + [#cstate { opened = Opened }] -> + true = ets:update_element( + Clients, Pid, + {#cstate.pending_closes, Opened}), + {reply, close, State1} + end; + false -> {noreply, run_pending_item(Item, State1)} + end; -handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + clients = Clients }) + when Limit =/= infinity andalso Count >= Limit -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + clients = Clients }) -> + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + ok = track_client(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, reduce(State #fhc_state { + obtain_pending = pending_in(Item, Pending) })}; + false -> + {noreply, run_pending_item(Item, State)} + end; +handle_call({set_limit, Limit}, _From, State) -> + {reply, ok, maybe_reduce( + process_pending(State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) }))}; +handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> + {reply, Limit, State}. + +handle_cast({register_callback, Pid, MFA}, + State = #fhc_state { clients = Clients }) -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; + +handle_cast({update, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + {noreply, State #fhc_state { elders = Elders1 }}; -handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> +handle_cast({close, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_obtains( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count - 1 }))}; + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), + {noreply, process_pending( + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; + +handle_cast({transfer, FromPid, ToPid}, State) -> + ok = track_client(ToPid, State#fhc_state.clients), + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, State)))}; handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; - -handle_cast({release_on_death, Pid}, State) -> - _MRef = erlang:monitor(process, Pid), - {noreply, State}. - -handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { count = Count, callbacks = Callbacks, - client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_obtains( - case dict:find(Pid, ClientMRefs) of - {ok, MRef} -> State #fhc_state { - elders = dict:erase(Pid, Elders), - client_mrefs = dict:erase(Pid, ClientMRefs), - callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { count = Count - 1 } - end)}. - -terminate(_Reason, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + clients = Clients }) -> + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), + FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, + {noreply, process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) })}. + +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +%% pending queue abstraction helpers +%%---------------------------------------------------------------------------- + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +filter_pending(Fun, {Count, Queue}) -> + {Delta, Queue1} = + queue_fold(fun (Item, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - requested(Item), QueueN} + end + end, {0, queue:new()}, Queue), + {Count + Delta, Queue1}. + +pending_new() -> + {0, queue:new()}. + +pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> + {Count + Requested, queue:in(Item, Queue)}. + +pending_out({0, _Queue} = Pending) -> + {empty, Pending}; +pending_out({N, Queue}) -> + {{value, #pending { requested = Requested }} = Result, Queue1} = + queue:out(Queue), + {Result, {N - Requested, Queue1}}. + +pending_count({Count, _Queue}) -> + Count. + +pending_is_empty({0, _Queue}) -> + true; +pending_is_empty({_N, _Queue}) -> + false. + +%%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- -process_obtains(State = #fhc_state { obtains = [] }) -> - State; -process_obtains(State = #fhc_state { limit = Limit, count = Count }) - when Limit /= infinity andalso Count >= Limit -> +obtain_limit(infinity) -> infinity; +obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of + OLimit when OLimit < 0 -> 0; + OLimit -> OLimit + end. + +requested({_Kind, _Pid, Requested, _From}) -> + Requested. + +process_pending(State = #fhc_state { limit = infinity }) -> State; -process_obtains(State = #fhc_state { limit = Limit, count = Count, - obtains = Obtains }) -> - ObtainsLen = length(Obtains), - ObtainableLen = lists:min([ObtainsLen, Limit - Count]), - Take = ObtainsLen - ObtainableLen, - {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), - [gen_server:reply(From, ok) || From <- ObtainableRev], - State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. - -maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, - callbacks = Callbacks, timer_ref = TRef }) - when Limit /= infinity andalso Count >= Limit -> +process_pending(State) -> + process_open(process_obtain(State)). + +process_open(State = #fhc_state { limit = Limit, + open_pending = Pending, + open_count = OpenCount, + obtain_count = ObtainCount }) -> + {Pending1, State1} = + process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + State1 #fhc_state { open_pending = Pending1 }. + +process_obtain(State = #fhc_state { limit = Limit, + obtain_pending = Pending, + obtain_limit = ObtainLimit, + obtain_count = ObtainCount, + open_count = OpenCount }) -> + Quota = lists:min([ObtainLimit - ObtainCount, + Limit - (ObtainCount + OpenCount)]), + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. + +process_pending(Pending, Quota, State) when Quota =< 0 -> + {Pending, State}; +process_pending(Pending, Quota, State) -> + case pending_out(Pending) of + {empty, _Pending} -> + {Pending, State}; + {{value, #pending { requested = Requested }}, _Pending1} + when Requested > Quota -> + {Pending, State}; + {{value, #pending { requested = Requested } = Item}, Pending1} -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) + end. + +run_pending_item(#pending { kind = Kind, + pid = Pid, + requested = Requested, + from = From }, + State = #fhc_state { clients = Clients }) -> + gen_server:reply(From, ok), + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, Requested, State). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { open_count = OpenCount, + obtain_count = ObtainCount, + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. + +maybe_reduce(State) -> + case needs_reduce(State) of + true -> reduce(State); + false -> State + end. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (not pending_is_empty(OpenPending)) + orelse (ObtainCount < ObtainLimit + andalso not pending_is_empty(ObtainPending))). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (_Pid, undefined, Accs) -> - Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, + blocked = Blocked } = CState] = + ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of + true -> Accs; + false -> {[CState | CStatesAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), - case Pids of + case CStates of [] -> ok; - _ -> AverageAge = Sum / ClientCount, - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + pending_count(OpenPending) + + pending_count(ObtainPending)) + end end, case TRef of undefined -> {ok, TRef1} = timer:apply_after( @@ -816,20 +1087,51 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, gen_server, cast, [?SERVER, check_counts]), State #fhc_state { timer_ref = TRef1 }; _ -> State - end; -maybe_reduce(State) -> - State. + end. -%% Googling around suggests that Windows has a limit somewhere around -%% 16M, eg -%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx -%% For everything else, assume ulimit exists. Further googling -%% suggests that BSDs (incl OS X), solaris and linux all agree that -%% ulimit -n is file handles +notify_age(CStates, AverageAge) -> + lists:foreach( + fun (#cstate { callback = undefined }) -> ok; + (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ pid = Pid, + callback = {M, F, A}, + opened = Opened } | Notifications]) -> + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}), + notify(Clients, Required - Opened, Notifications). + +track_client(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok + end. + +%% For all unices, assume ulimit exists. Further googling suggests +%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n +%% is file handles ulimit() -> case os:type() of {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS; + ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS; {unix, _OsName} -> %% Under Linux, Solaris and FreeBSD, ulimit is a shell %% builtin, not a command. In OS X, it's a command. @@ -852,11 +1154,3 @@ ulimit() -> _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. - -ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> - case dict:find(Pid, ClientMRefs) of - {ok, _MRef} -> State; - error -> MRef = erlang:monitor(process, Pid), - State #fhc_state { - client_mrefs = dict:store(Pid, MRef, ClientMRefs) } - end. diff --git a/src/gatherer.erl b/src/gatherer.erl index 31dda16e83..1e03d6c41c 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -42,7 +42,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(stop/1 :: (pid()) -> 'ok'). -spec(fork/1 :: (pid()) -> 'ok'). -spec(finish/1 :: (pid()) -> 'ok'). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index f1c8eb4d9d..9fb9e2fea7 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -976,7 +976,7 @@ print_event(Dev, Event, Name) -> terminate(Reason, Name, Msg, Mod, State, Debug) -> case catch Mod:terminate(Reason, State) of {'EXIT', R} -> - error_info(R, Name, Msg, State, Debug), + error_info(R, Reason, Name, Msg, State, Debug), exit(R); _ -> case Reason of @@ -987,42 +987,44 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> {shutdown,_}=Shutdown -> exit(Shutdown); _ -> - error_info(Reason, Name, Msg, State, Debug), + error_info(Reason, undefined, Name, Msg, State, Debug), exit(Reason) end end. -error_info(_Reason, application_controller, _Msg, _State, _Debug) -> +error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> %% OTP-5811 Don't send an error report if it's the system process %% application_controller which is terminating - let init take care %% of it instead ok; -error_info(Reason, Name, Msg, State, Debug) -> - Reason1 = - case Reason of - {undef,[{M,F,A}|MFAs]} -> - case code:is_loaded(M) of - false -> - {'module could not be loaded',[{M,F,A}|MFAs]}; - _ -> - case erlang:function_exported(M, F, length(A)) of - true -> - Reason; - false -> - {'function not exported',[{M,F,A}|MFAs]} - end - end; - _ -> - Reason - end, - format("** Generic server ~p terminating \n" - "** Last message in was ~p~n" - "** When Server state == ~p~n" - "** Reason for termination == ~n** ~p~n", - [Name, Msg, State, Reason1]), +error_info(Reason, RootCause, Name, Msg, State, Debug) -> + Reason1 = error_reason(Reason), + Fmt = + "** Generic server ~p terminating~n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + case RootCause of + undefined -> format(Fmt, [Name, Msg, State, Reason1]); + _ -> format(Fmt ++ "** In 'terminate' callback " + "with reason ==~n** ~p~n", + [Name, Msg, State, Reason1, + error_reason(RootCause)]) + end, sys:print_log(Debug), ok. +error_reason({undef,[{M,F,A}|MFAs]} = Reason) -> + case code:is_loaded(M) of + false -> {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> case erlang:function_exported(M, F, length(A)) of + true -> Reason; + false -> {'function not exported',[{M,F,A}|MFAs]} + end + end; +error_reason(Reason) -> + Reason. + %%% --------------------------------------------------- %%% Misc. functions. %%% --------------------------------------------------- diff --git a/src/pg_local.erl b/src/pg_local.erl index f5ded123d7..49fa873ae4 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -45,8 +45,8 @@ -type(name() :: term()). --spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), term())). --spec(start/0 :: () -> rabbit_types:ok_or_error2(pid(), term())). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(join/2 :: (name(), pid()) -> 'ok'). -spec(leave/2 :: (name(), pid()) -> 'ok'). -spec(get_members/1 :: (name()) -> [pid()]). diff --git a/src/rabbit.erl b/src/rabbit.erl index 41c628a071..303d1f3ac0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -83,12 +83,6 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). --rabbit_boot_step({rabbit_hooks, - [{description, "internal event notification system"}, - {mfa, {rabbit_hooks, start, []}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c25fd9b4cb..99faa511c3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,7 +197,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. + [Q || Q <- Qs, + gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 15939802ae..ce5c8162a1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -631,6 +631,7 @@ handle_call({init, Recover}, From, declare(Recover, From, State); _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined} = State, + gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( @@ -638,7 +639,7 @@ handle_call({init, Recover}, From, end, BQS = BQ:init(QName, IsDurable, Recover), %% Rely on terminate to delete the queue. - {stop, normal, not_found, State#q{backing_queue_state = BQS}} + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ae49b30127..0964700c29 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). @@ -44,7 +44,7 @@ handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, - transaction_id, tx_participants, next_tag, + start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, @@ -80,9 +80,11 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/6 :: +-spec(start_link/7 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). + rabbit_types:vhost(), pid(), + fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> + rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -106,9 +108,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []). +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun) -> + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -163,15 +166,16 @@ confirm(Pid, MsgSeqNo) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun]) -> process_flag(trap_exit, true), - link(WriterPid), ok = pg_local:join(rabbit_channels, self()), State = #ch{ state = starting, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -281,24 +285,19 @@ handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) -> end, State, QPids)}. -handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, - State = #ch{writer_pid = WriterPid}) -> - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{qpid_to_msgs = QTM}) -> - case dict:find(QPid, QTM) of - {ok, Msgs} -> - S = gb_sets:fold(fun (MsgSeqNo, State0) -> - send_or_enqueue_ack(MsgSeqNo, State0) - end, State, Msgs), - {noreply, S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}}; - error -> - erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)} - end. + State1 = case dict:find(QPid, QTM) of + {ok, Msgs} -> + S = gb_sets:fold(fun (MsgSeqNo, State0) -> + send_or_enqueue_ack(MsgSeqNo, State0) + end, State, Msgs), + S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}; + error -> + State + end, + erase_queue_stats(QPid), + {noreply, queue_blocked(QPid, State1)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -310,8 +309,10 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - normal -> ok = Res; - _ -> ok + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok end, terminate(State). @@ -506,7 +507,7 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), - ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), stop; handle_method(#'access.request'{},_, State) -> @@ -1185,8 +1186,8 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)), +start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> + {ok, LPid} = SLF(queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. @@ -1258,12 +1259,10 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> +terminate(State) -> stop_ack_timer(State), pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + rabbit_event:notify(channel_closed, [{pid, self()}]). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1292,11 +1291,9 @@ maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> end. incr_stats({QPid, _} = QX, Inc, Measure) -> - io:format("incr_stats for ~p~n", [QPid]), maybe_monitor(QPid), update_measures(queue_exchange_stats, QX, Inc, Measure); incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> - io:format("incr_stats for ~p~n", [QPid]), maybe_monitor(QPid), update_measures(queue_stats, QPid, Inc, Measure); incr_stats(X, Inc, Measure) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl new file mode 100644 index 0000000000..02199a6516 --- /dev/null +++ b/src/rabbit_channel_sup.erl @@ -0,0 +1,96 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup). + +-behaviour(supervisor2). + +-export([start_link/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([start_link_args/0]). + +-type(start_link_args() :: + {rabbit_types:protocol(), rabbit_net:socket(), + rabbit_channel:channel_number(), non_neg_integer(), pid(), + rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, WriterPid} = + supervisor2:start_child( + SupPid, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, WriterPid, Username, VHost, + Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, FramingChannelPid} = + supervisor2:start_child( + SupPid, + {framing_channel, {rabbit_framing_channel, start_link, + [ReaderPid, ChannelPid, Protocol]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + {ok, SupPid, FramingChannelPid}. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + +start_limiter_fun(SupPid) -> + fun (UnackedCount) -> + Me = self(), + {ok, _Pid} = + supervisor2:start_child( + SupPid, + {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}) + end. diff --git a/src/rabbit_hooks.erl b/src/rabbit_channel_sup_sup.erl index 3fc84c1e09..d193880555 100644 --- a/src/rabbit_hooks.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -29,45 +29,37 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_hooks). +-module(rabbit_channel_sup_sup). --export([start/0]). --export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]). +-behaviour(supervisor2). --define(TableName, rabbit_hooks). +-export([start_link/0, start_channel/2]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start/0 :: () -> 'ok'). --spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok'). --spec(unsubscribe/2 :: (atom(), atom()) -> 'ok'). --spec(trigger/2 :: (atom(), list()) -> 'ok'). --spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok'). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), pid()}). -endif. -start() -> - ets:new(?TableName, [bag, public, named_table]), - ok. +%%---------------------------------------------------------------------------- -subscribe(Hook, HandlerName, Handler) -> - ets:insert(?TableName, {Hook, HandlerName, Handler}), - ok. +start_link() -> + supervisor2:start_link(?MODULE, []). -unsubscribe(Hook, HandlerName) -> - ets:match_delete(?TableName, {Hook, HandlerName, '_'}), - ok. +start_channel(Pid, Args) -> + {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]), + link(ChSupPid), + Result. -trigger(Hook, Args) -> - Hooks = ets:lookup(?TableName, Hook), - [case catch apply(M, F, [Hook, Name, Args | A]) of - {'EXIT', Reason} -> - rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", - [Name, Hook, Reason]); - _ -> ok - end || {_, Name, {M, F, A}} <- Hooks], - ok. +%%---------------------------------------------------------------------------- -notify_remote(Hook, HandlerName, Args, Pid, PidArgs) -> - Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]}, - ok. +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, + temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl new file mode 100644 index 0000000000..69e21d73cc --- /dev/null +++ b/src/rabbit_connection_sup.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_connection_sup). + +-behaviour(supervisor2). + +-export([start_link/0, reader/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid(), pid()}). +-spec(reader/1 :: (pid()) -> pid()). + +-endif. + +%%-------------------------------------------------------------------------- + +start_link() -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, Collector} = + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + {ok, ReaderPid} = + supervisor2:start_child( + SupPid, + {reader, {rabbit_reader, start_link, + [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), + {ok, SupPid, ReaderPid}. + +reader(Pid) -> + hd(supervisor2:find_child(Pid, reader)). + +%%-------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + +start_heartbeat_fun(SupPid) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + Parent = self(), + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 113ffcb4bc..0f00537a6a 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -69,7 +69,7 @@ -type(timer_fun() :: fun (() -> 'ok')). --spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). -spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). -spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl index 7906fbee72..f15275b538 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_exchange_type_registry.erl @@ -45,8 +45,7 @@ -ifdef(use_specs). --spec(start_link/0 :: - () -> 'ignore' | rabbit_types:ok_or_error2(pid(), term())). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/2 :: (binary(), atom()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 89b2441e38..e796acf327 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery = Delivery). split_topic_key(Key) -> - re:split(Key, "\\.", [{return, list}]). + string:tokens(binary_to_list(Key), "."). topic_matches(PatternKey, RoutingKey) -> P = split_topic_key(PatternKey), diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 553faaa814..cb53185f6b 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -39,16 +39,9 @@ %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs, Protocol) -> - Parent = self(), - {ok, spawn_link( - fun () -> - %% we trap exits so that a normal termination of - %% the channel or reader process terminates us too. - process_flag(trap_exit, true), - {ok, ChannelPid} = apply(StartFun, StartArgs), - mainloop(Parent, ChannelPid, Protocol) - end)}. +start_link(Parent, ChannelPid, Protocol) -> + {ok, proc_lib:spawn_link( + fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -62,12 +55,6 @@ shutdown(Pid) -> read_frame(ChannelPid) -> receive - %% converting the exit signal into one of our own ensures that - %% the reader sees the right pid (i.e. ours) when a channel - %% exits. Similarly in the other direction, though it is not - %% really relevant there since the channel is not specifically - %% watching out for reader exit signals. - {'EXIT', _Pid, Reason} -> exit(Reason); {frame, Frame} -> Frame; terminate -> rabbit_channel:shutdown(ChannelPid), read_frame(ChannelPid); diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index af1c629f41..e7d0c10177 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -52,7 +52,7 @@ -type(guid() :: binary()). --spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(guid/0 :: () -> guid()). -spec(string_guid/1 :: (any()) -> string()). -spec(binstring_guid/1 :: (any()) -> binary()). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index faddffc1fc..a9945af1d4 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,47 +31,53 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). +-export([start_heartbeat_sender/3, start_heartbeat_receiver/3, + pause_monitor/1, resume_monitor/1]). + +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- -ifdef(use_specs). --type(pids() :: rabbit_types:maybe({pid(), pid()})). +-export_type([heartbeaters/0]). + +-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). + +-spec(start_heartbeat_sender/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()). --spec(pause_monitor/1 :: (pids()) -> 'ok'). --spec(resume_monitor/1 :: (pids()) -> 'ok'). +-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). +-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_heartbeat(_Sock, 0) -> - none; -start_heartbeat(Sock, TimeoutSec) -> - Parent = self(), +start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Sender = - spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, - send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, Parent) end), + heartbeater( + {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> + catch rabbit_net:send( + Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}). + +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Receiver = - spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, Parent) end), - {Sender, Receiver}. + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + Parent ! timeout, + stop + end}). pause_monitor(none) -> ok; @@ -87,19 +93,15 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- -heartbeater(Params, Parent) -> - heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). +heartbeater(Params) -> + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - MonitorRef, {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, V) end, receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; pause -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; resume -> Recurse({0, 0}); Other -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 813ccc7538..da7078f1ba 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/2, shutdown/1]). +-export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -45,8 +45,8 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> + rabbit_types:ok_pid_or_error()). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). @@ -76,17 +76,10 @@ start_link(ChPid, UnackedMsgCount) -> gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - true = unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). - limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, {limit, PrefetchCount})). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -124,8 +117,7 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, unblock, infinity)). + gen_server2:call(LimiterPid, unblock, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -164,9 +156,6 @@ handle_call(unblock, _From, State) -> {stop, State1} -> {stop, normal, stopped, State1} end. -handle_cast(shutdown, State) -> - {stop, normal, State}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count @@ -246,9 +235,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - -unlink_on_stopped(LimiterPid, stopped) -> - ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), - stopped; -unlink_on_stopped(_LimiterPid, Result) -> - Result. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 85bcbca04a..863f77e7eb 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -50,7 +50,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(debug/1 :: (string()) -> 'ok'). -spec(debug/2 :: (string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index bdf3807531..f87b62713a 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -86,7 +86,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(update/0 :: () -> 'ok'). -spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). -spec(deregister/1 :: (pid()) -> 'ok'). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5fa3f8edb4..086d260e24 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -39,7 +39,6 @@ -export([die/1, frame_error/2, amqp_error/4, protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). --export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). -export([table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -108,10 +107,6 @@ rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> 'ok' | rabbit_types:connection_exit()). --spec(get_config/1 :: - (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). --spec(get_config/2 :: (atom(), A) -> A). --spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(table_lookup/2 :: @@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) -> [Key, rabbit_misc:rs(Name), New1, Orig1]) end. -get_config(Key) -> - case dirty_read({rabbit_config, Key}) of - {ok, {rabbit_config, Key, V}} -> {ok, V}; - Other -> Other - end. - -get_config(Key, DefaultValue) -> - case get_config(Key) of - {ok, V} -> V; - {error, not_found} -> DefaultValue - end. - -set_config(Key, Value) -> - ok = mnesia:dirty_write({rabbit_config, Key, Value}). - dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 505dc28fe8..a321488897 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -91,7 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = wait_for_tables(), ok. is_db_empty() -> @@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force), - ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -157,57 +155,83 @@ table_definitions() -> [{rabbit_user, [{record_name, user}, {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, {rabbit_vhost, [{record_name, vhost}, {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, - {rabbit_config, - [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, - {type, bag}]}, + {type, bag}, + {match, #listener{_='_'}}]}, {rabbit_durable_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_reverse_route, [{record_name, reverse_route}, {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_exchange, [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + +binding_match() -> + #binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{queue_name = queue_name_match(), + exchange_name = exchange_name_match(), + _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. table_names() -> [Tab || {Tab, _} <- table_definitions()]. replicated_table_names() -> - [Tab || {Tab, Attrs} <- table_definitions(), - not lists:member({local_content, true}, Attrs) + [Tab || {Tab, TabDef} <- table_definitions(), + not lists:member({local_content, true}, TabDef) ]. dir() -> mnesia:system_info(directory). @@ -232,26 +256,55 @@ ensure_mnesia_not_running() -> yes -> throw({error, mnesia_unexpectedly_running}) end. +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + check_schema_integrity() -> - TabDefs = table_definitions(), Tables = mnesia:system_info(tables), - case [Error || Tab <- table_names(), + case [Error || {Tab, TabDef} <- table_definitions(), case lists:member(Tab, Tables) of false -> Error = {table_missing, Tab}, true; true -> - {_, TabDef} = proplists:lookup(Tab, TabDefs), {_, ExpAttrs} = proplists:lookup(attributes, TabDef), Attrs = mnesia:table_info(Tab, attributes), Error = {table_attributes_mismatch, Tab, ExpAttrs, Attrs}, Attrs /= ExpAttrs end] of - [] -> ok; + [] -> check_table_integrity(); Errors -> {error, Errors} end. +check_table_integrity() -> + ok = wait_for_tables(), + case lists:all(fun ({Tab, TabDef}) -> + {_, Match} = proplists:lookup(match, TabDef), + read_test_table(Tab, Match) + end, table_definitions()) of + true -> ok; + false -> {error, invalid_table_content} + end. + +read_test_table(Tab, Match) -> + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + true; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> true; + _ -> false + end + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -347,8 +400,9 @@ init_db(ClusterNodes, Force) -> ok = create_local_table_copies(case IsDiskNode of true -> disc; false -> ram - end) - end; + end), + ok = ensure_schema_integrity() + end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -363,7 +417,9 @@ create_schema() -> cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - create_tables(). + ok = create_tables(), + ok = ensure_schema_integrity(), + ok = wait_for_tables(). move_db() -> mnesia:stop(), @@ -388,12 +444,13 @@ move_db() -> ok. create_tables() -> - lists:foreach(fun ({Tab, TabArgs}) -> - case mnesia:create_table(Tab, TabArgs) of + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of {atomic, ok} -> ok; {aborted, Reason} -> throw({error, {table_creation_failed, - Tab, TabArgs, Reason}}) + Tab, TabDef1, Reason}}) end end, table_definitions()), @@ -448,17 +505,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> - case check_schema_integrity() of - ok -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end; + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) + throw({error, {failed_waiting_for_tables, Reason}}) end. reset(Force) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 445aff487d..9be895ed35 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -126,8 +126,7 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', - startup_fun_state()) -> - 'ignore' | rabbit_types:ok_or_error2(pid(), any())). + startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> @@ -321,7 +320,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, self(), Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, self(), Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -366,7 +365,7 @@ set_maximum_since_use(Server, Age) -> client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -383,10 +382,10 @@ client_terminate(CState) -> client_delete_and_terminate(CState, Server, Ref) -> ok = client_terminate(CState), - ok = gen_server2:call(Server, {delete_client, Ref}, infinity). + ok = gen_server2:cast(Server, {delete_client, Ref}). successfully_recovered_state(Server) -> - gen_server2:call(Server, successfully_recovered_state, infinity). + gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). register_sync_callback(Server, Fun) -> gen_server2:call(Server, {register_sync_callback, Fun}, infinity). @@ -502,7 +501,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> +init([Server, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, @@ -513,11 +512,32 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> {ok, IndexModule} = application:get_env(msg_store_index_module), rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), - {AllCleanShutdown, IndexState, ClientRefs1} = - recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server), + AttemptFileSummaryRecovery = + case ClientRefs of + undefined -> ok = rabbit_misc:recursive_delete([Dir]), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + false; + _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + recover_crashed_compactions(Dir) + end, + %% if we found crashed compactions we trust neither the + %% file_summary nor the location index. Note the file_summary is + %% left empty here if it can't be recovered. {FileSummaryRecovered, FileSummaryEts} = - recover_file_summary(AllCleanShutdown, Dir, Server), + recover_file_summary(AttemptFileSummaryRecovery, Dir), + + {CleanShutdown, IndexState, ClientRefs1} = + recover_index_and_client_refs(IndexModule, FileSummaryRecovered, + ClientRefs, Dir, Server), + %% CleanShutdown => msg location index and file_summary both + %% recovered correctly. + true = case {FileSummaryRecovered, CleanShutdown} of + {true, false} -> ets:delete_all_objects(FileSummaryEts); + _ -> true + end, + %% CleanShutdown <=> msg location index and file_summary both + %% recovered correctly. DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, @@ -544,28 +564,16 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, - successfully_recovered = AllCleanShutdown, + successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, pid_to_fun = dict:new(), pid_to_guids = dict:new() }, - ok = case AllCleanShutdown of - true -> ok; - false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State) - end, - - FileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), - TmpFileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), - - %% There should be no more tmp files now, so go ahead and load the - %% whole lot - Files = [filename_to_num(FileName) || FileName <- FileNames], + %% If we didn't recover the msg location index then we need to + %% rebuild it now. {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(FileSummaryRecovered, Files, State), + build_index(CleanShutdown, StartupFunState, State), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -606,11 +614,6 @@ handle_call({new_client_state, CRef}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({delete_client, CRef}, _From, - State = #msstate { client_refs = ClientRefs }) -> - reply(ok, - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); - handle_call({register_sync_callback, Fun}, {Pid, _}, State = #msstate { pid_to_fun = PTF }) -> erlang:monitor(process, Pid), @@ -618,7 +621,7 @@ handle_call({register_sync_callback, Fun}, {Pid, _}, State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }). -handle_cast({write, Pid, Guid, Msg}, +handle_cast({write, Pid, Guid}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, @@ -628,6 +631,7 @@ handle_cast({write, Pid, Guid, Msg}, pid_to_fun = PTF, pid_to_guids = PTG}) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> %% New message, lots to do @@ -736,7 +740,12 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State). + noreply(State); + +handle_cast({delete_client, CRef}, + State = #msstate { client_refs = ClientRefs }) -> + noreply( + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -1082,9 +1091,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). -sort_file_names(FileNames) -> +list_sorted_file_names(Dir, Ext) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, - FileNames). + filelib:wildcard("*" ++ Ext, Dir)). %%---------------------------------------------------------------------------- %% message cache helper functions @@ -1164,15 +1173,15 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) -> - ok = rabbit_misc:recursive_delete([Dir]), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> + {false, IndexModule:new(Dir), sets:new()}; +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> + rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), {false, IndexModule:new(Dir), sets:new()}; -recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> Fresh = fun (ErrorMsg, ErrorArgs) -> - rabbit_log:warning("~w: " ++ ErrorMsg ++ - "~nrebuilding indices from scratch~n", + rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" + "rebuilding indices from scratch~n", [Server | ErrorArgs]), {false, IndexModule:new(Dir), sets:new()} end, @@ -1186,12 +1195,12 @@ recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> andalso IndexModule =:= RecIndexModule) of true -> case IndexModule:recover(Dir) of {ok, IndexState1} -> - ClientRefs1 = sets:from_list(ClientRefs), - {true, IndexState1, ClientRefs1}; + {true, IndexState1, + sets:from_list(ClientRefs)}; {error, Error} -> Fresh("failed to recover index: ~p", [Error]) end; - false -> Fresh("recovery terms differ from present", []) + false -> Fresh("recovery terms differ from present", []) end end. @@ -1212,7 +1221,7 @@ store_file_summary(Tid, Dir) -> ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), [{extended_info, [object_count]}]). -recover_file_summary(false, _Dir, _Server) -> +recover_file_summary(false, _Dir) -> %% TODO: the only reason for this to be an *ordered*_set is so %% that a) maybe_compact can start a traversal from the eldest %% file, and b) build_index in fast recovery mode can easily @@ -1222,15 +1231,12 @@ recover_file_summary(false, _Dir, _Server) -> %% ditching the latter would be neater. {false, ets:new(rabbit_msg_store_file_summary, [ordered_set, public, {keypos, #file_summary.file}])}; -recover_file_summary(true, Dir, Server) -> +recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), + {ok, Tid} -> file:delete(Path), {true, Tid}; - {error, Error} -> rabbit_log:warning( - "~w: failed to recover file summary: ~p~n" - "rebuilding~n", [Server, Error]), - recover_file_summary(false, Dir, Server) + {error, _Error} -> recover_file_summary(false, Dir) end. count_msg_refs(Gen, Seed, State) -> @@ -1243,6 +1249,7 @@ count_msg_refs(Gen, Seed, State) -> ok = case index_lookup(Guid, State) of not_found -> index_insert(#msg_location { guid = Guid, + file = undefined, ref_count = Delta }, State); #msg_location { ref_count = RefCount } = StoreEntry -> @@ -1257,7 +1264,9 @@ count_msg_refs(Gen, Seed, State) -> count_msg_refs(Gen, Next, State) end. -recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> +recover_crashed_compactions(Dir) -> + FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION), + TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP), lists:foreach( fun (TmpFileName) -> NonTmpRelatedFileName = @@ -1266,103 +1275,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> ok = recover_crashed_compaction( Dir, TmpFileName, NonTmpRelatedFileName) end, TmpFileNames), - ok. + TmpFileNames == []. recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> - {ok, UncorruptedMessagesTmp, GuidsTmp} = - scan_file_for_valid_messages_and_guids(Dir, TmpFileName), - {ok, UncorruptedMessages, Guids} = - scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), - %% 1) It's possible that everything in the tmp file is also in the - %% main file such that the main file is (prefix ++ - %% tmpfile). This means that compaction failed immediately - %% prior to the final step of deleting the tmp file. Plan: just - %% delete the tmp file - %% 2) It's possible that everything in the tmp file is also in the - %% main file but with holes throughout (or just somthing like - %% main = (prefix ++ hole ++ tmpfile)). This means that - %% compaction wrote out the tmp file successfully and then - %% failed. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 3) It's possible that everything in the tmp file is also in the - %% main file but such that the main file does not end with tmp - %% file (and there are valid messages in the suffix; main = - %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This - %% means that compaction failed as we were writing out the tmp - %% file. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 4) It's possible that there are messages in the tmp file which - %% are not in the main file. This means that writing out the - %% tmp file succeeded, but then we failed as we were copying - %% them back over to the main file, after truncating the main - %% file. As the main file has already been truncated, it should - %% consist only of valid messages. Plan: Truncate the main file - %% back to before any of the files in the tmp file and copy - %% them over again - TmpPath = form_filename(Dir, TmpFileName), - case is_sublist(GuidsTmp, Guids) of - true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file - %% is empty - ok = file:delete(TmpPath); - false -> - %% We're in case 4 above. We only care about the inital - %% msgs in main file that are not in the tmp file. If - %% there are no msgs in the tmp file then we would be in - %% the 'true' branch of this case, so we know the - %% lists:last call is safe. - EldestTmpGuid = lists:last(GuidsTmp), - {Guids1, UncorruptedMessages1} - = case lists:splitwith( - fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of - {_Guids, []} -> %% no msgs from tmp in main - {Guids, UncorruptedMessages}; - {Dropped, [EldestTmpGuid | Rest]} -> - %% Msgs in Dropped are in tmp, so forget them. - %% *cry*. Lists indexed from 1. - {Rest, lists:sublist(UncorruptedMessages, - 2 + length(Dropped), - length(Rest))} - end, - %% The main file prefix should be contiguous - {Top, Guids1} = find_contiguous_block_prefix( - lists:reverse(UncorruptedMessages1)), - %% we should have that none of the messages in the prefix - %% are in the tmp file - true = is_disjoint(Guids1, GuidsTmp), - %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - [read | ?WRITE_MODE]), - %% Wipe out any rubbish at the end of the file. Remember - %% the head of the list will be the highest entry in the - %% file. - [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize, - %% Extend the main file as big as necessary in a single - %% move. If we run out of disk space, this truncate could - %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), - {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), - ok = file_handle_cache:close(MainHdl), - ok = file_handle_cache:delete(TmpHdl), - - {ok, _MainMessages, GuidsMain} = - scan_file_for_valid_messages_and_guids( - Dir, NonTmpRelatedFileName), - %% check that everything in Guids1 is in GuidsMain - true = is_sublist(Guids1, GuidsMain), - %% check that everything in GuidsTmp is in GuidsMain - true = is_sublist(GuidsTmp, GuidsMain) - end, + %% Because a msg can legitimately appear multiple times in the + %% same file, identifying the contents of the tmp file and where + %% they came from is non-trivial. If we are recovering a crashed + %% compaction then we will be rebuilding the index, which can cope + %% with duplicates appearing. Thus the simplest and safest thing + %% to do is to append the contents of the tmp file to its main + %% file. + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, + ?READ_MODE ++ ?WRITE_MODE), + {ok, _End} = file_handle_cache:position(MainHdl, eof), + Size = filelib:file_size(form_filename(Dir, TmpFileName)), + {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size), + ok = file_handle_cache:close(MainHdl), + ok = file_handle_cache:delete(TmpHdl), ok. -is_sublist(SmallerL, BiggerL) -> - lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL). - -is_disjoint(SmallerL, BiggerL) -> - lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). - scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( @@ -1376,10 +1308,6 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_file_for_valid_messages_and_guids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), - {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. - %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -1394,8 +1322,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(true, _Files, State = #msstate { - file_summary_ets = FileSummaryEts }) -> +build_index(true, _StartupFunState, + State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, file_size = FileSize, @@ -1407,12 +1335,17 @@ build_index(true, _Files, State = #msstate { sum_file_size = SumFileSize + FileSize, current_file = File }} end, {0, State}, FileSummaryEts); -build_index(false, Files, State) -> +build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, + State = #msstate { dir = Dir }) -> + ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), {ok, Pid} = gatherer:start_link(), - case Files of - [] -> build_index(Pid, undefined, [State #msstate.current_file], State); - _ -> {Offset, State1} = build_index(Pid, undefined, Files, State), - {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} + case [filename_to_num(FileName) || + FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of + [] -> build_index(Pid, undefined, [State #msstate.current_file], + State); + Files -> {Offset, State1} = build_index(Pid, undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, + State1, Files)} end. build_index(Gatherer, Left, [], @@ -1453,14 +1386,14 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, lists:foldl( fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case index_lookup(Guid, State) of - not_found -> - {VMAcc, VTSAcc}; - StoreEntry -> + #msg_location { file = undefined } = StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, total_size = TotalSize }, State), - {[Obj | VMAcc], VTSAcc + TotalSize} + {[Obj | VMAcc], VTSAcc + TotalSize}; + _ -> + {VMAcc, VTSAcc} end end, {[], 0}, Messages), %% foldl reverses lists, find_contiguous_block_prefix needs @@ -1715,9 +1648,10 @@ find_unremoved_messages_in_file(File, {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> + lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of - #msg_location { file = File } = Entry -> + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> {[ Entry | List ], TotalSize + Size}; _ -> Acc diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index eaa411730c..c7948b7eb3 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -56,7 +56,7 @@ -ifdef(use_specs). -spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> - 'ignore' | rabbit_types:ok_or_error2(pid(), any())). + rabbit_types:ok_pid_or_error()). -spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). -spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(stop/1 :: (pid()) -> 'ok'). diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 3facef17f7..c7a5a60027 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -93,7 +93,14 @@ usage() -> action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), application:load(rabbit), - NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")), + {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts( + getenv("RABBITMQ_NODENAME")), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + throw({cannot_connect_to_epmd, NodeHost, EpmdReason}); + {ok, _} -> + ok + end, {NodePids, Running} = case list_to_integer(NodeCount) of 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 3a3357ba9d..f656e04cba 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -118,7 +118,7 @@ start() -> {rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, - {rabbit_reader,start_link,[]}]}, + {rabbit_connection_sup,start_link,[]}]}, transient, infinity, supervisor, [tcp_client_sup]}), ok. @@ -204,10 +204,10 @@ on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock, SockTransform) -> - {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock, SockTransform}, - Child. + {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader. start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -230,8 +230,9 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, Pid, _, _} <- supervisor:which_children( - rabbit_tcp_client_sup)]. + [rabbit_connection_sup:reader(ConnSup) || + {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). @@ -242,8 +243,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). close_connection(Pid, Explanation) -> - case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end, - supervisor:which_children(rabbit_tcp_client_sup)) of + case lists:member(Pid, connections()) of true -> rabbit_reader:shutdown(Pid, Explanation); false -> throw({error, {not_a_connection_pid, Pid}}) end. diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index a427b13548..66e5cf6311 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -73,9 +73,8 @@ {deliver, pmsg()} | {ack, pmsg()}). --spec(start_link/1 :: - ([rabbit_amqqueue:name()]) - -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/1 :: ([rabbit_amqqueue:name()]) -> + rabbit_types:ok_pid_or_error()). -spec(transaction/1 :: ([work_item()]) -> 'ok'). -spec(extend_transaction/2 :: ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()]) diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index a170fb1da8..26274a365e 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -51,6 +51,7 @@ %%---------------------------------------------------------------------------- start() -> + io:format("Activating RabbitMQ plugins ..."), %% Ensure Rabbit is loaded so we can access it's environment application:load(rabbit), @@ -76,7 +77,7 @@ start() -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), + RabbitVersion = proplists:get_value(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, @@ -129,8 +130,10 @@ start() -> ok -> ok; error -> error("failed to compile boot script file ~s", [ScriptFile]) end, - io:format("~n~w plugins activated.~n~n", [length(PluginApps)]), - [io:format("* ~w~n", [App]) || App <- PluginApps], + io:format("~n~w plugins activated:~n", [length(PluginApps)]), + [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) + || App <- PluginApps], + io:nl(), halt(), ok. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 9257ec82ab..0a49b94d09 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -33,7 +33,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1, shutdown/1]). +-export([start_link/0, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -46,10 +46,9 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok(pid())). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). --spec(shutdown/1 :: (pid()) -> 'ok'). -endif. @@ -64,9 +63,6 @@ register(CollectorPid, Q) -> delete_all(CollectorPid) -> gen_server:call(CollectorPid, delete_all, infinity). -shutdown(CollectorPid) -> - gen_server:cast(CollectorPid, shutdown). - %%---------------------------------------------------------------------------- init([]) -> @@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> || {MonitorRef, Q} <- dict:to_list(Queues)], {reply, ok, State}. -handle_cast(shutdown, State) -> - {stop, normal, State}. +handle_cast(Msg, State) -> + {stop, {unhandled_cast, Msg}, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State = #state{queues = Queues}) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a82da57ae7..fbc3f8aa42 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -259,8 +259,8 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - State2 = State#qistate { unsynced_guids = - [Guid | State#qistate.unsynced_guids] }, + State2 = State1 #qistate { unsynced_guids = + [Guid | State1#qistate.unsynced_guids] }, maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)). deliver(SeqIds, State) -> @@ -273,7 +273,7 @@ sync([], State) -> State; sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; -sync(SeqIds, State = #qistate { journal_handle = JournalHdl, +sync(_SeqIds, State = #qistate { journal_handle = JournalHdl, on_sync = OnSyncFun }) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d5ade90f6f..09ada1c0c8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/2]). +-export([init/4, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -46,7 +46,6 @@ -export([emit_stats/1]). -import(gen_tcp). --import(fprof). -import(inet). -import(prim_inet). @@ -60,7 +59,8 @@ %--------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, - connection_state, queue_collector, heartbeater, stats_timer}). + connection_state, queue_collector, heartbeater, stats_timer, + channel_sup_sup_pid, start_heartbeat_fun}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -160,6 +160,12 @@ -ifdef(use_specs). +-type(start_heartbeat_fun() :: + fun ((rabbit_networking:socket(), non_neg_integer()) -> + rabbit_heartbeat:heartbeaters())). + +-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> + rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -168,21 +174,33 @@ -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +%% These specs only exists to add no_return() to keep dialyzer happy +-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(start_connection/7 :: + (pid(), pid(), pid(), start_heartbeat_fun(), any(), + rabbit_networking:socket(), + fun ((rabbit_networking:socket()) -> + rabbit_types:ok_or_error2( + rabbit_networking:socket(), any()))) -> no_return()). + -endif. %%-------------------------------------------------------------------------- -start_link() -> - {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, + Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent) -> +init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, Deb, Sock, SockTransform) + start_connection( + Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + SockTransform) end. system_continue(Parent, Deb, State) -> @@ -208,33 +226,6 @@ info(Pid, Items) -> emit_stats(Pid) -> gen_server:cast(Pid, emit_stats). -setup_profiling() -> - Value = rabbit_misc:get_config(profiling_enabled, false), - case Value of - once -> - rabbit_log:info("Enabling profiling for this connection, " - "and disabling for subsequent.~n"), - rabbit_misc:set_config(profiling_enabled, false), - fprof:trace(start); - true -> - rabbit_log:info("Enabling profiling for this connection.~n"), - fprof:trace(start); - false -> - ok - end, - Value. - -teardown_profiling(Value) -> - case Value of - false -> - ok; - _ -> - rabbit_log:info("Completing profiling for this connection.~n"), - fprof:trace(stop), - fprof:profile(), - fprof:analyse([{dest, []}, {cols, 100}]) - end. - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -261,7 +252,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, + Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -270,28 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) -> ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), - ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none, - protocol = none}, - callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = - rabbit_event:init_stats_timer()}, - handshake, 8)) + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_length = 0, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector, + heartbeater = none, + stats_timer = + rabbit_event:init_stats_timer(), + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun + }, + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -308,9 +301,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue), - rabbit_misc:unlink_and_capture_exit(Collector), - rabbit_queue_collector:shutdown(Collector), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -347,10 +337,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> exit(Reason); {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_channel_exit(Channel, Reason, State)); - {'EXIT', Pid, Reason} -> - mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); + {channel_exit, ChannelOrFrPid, Reason} -> + mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); + {'EXIT', ChSupPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,33 +433,45 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> - {channel, Channel} = get({chpid, ChPid}), +handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> + {channel, Channel} = get({ch_fr_pid, ChFrPid}), handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, normal, State) -> - erase({chpid, Pid}), - maybe_close(State); -handle_dependent_exit(Pid, Reason, State) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) +handle_dependent_exit(ChSupPid, Reason, State) -> + case termination_kind(Reason) of + controlled -> + case erase({ch_sup_pid, ChSupPid}) of + undefined -> ok; + {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) + end, + maybe_close(State); + uncontrolled -> + case channel_cleanup(ChSupPid) of + undefined -> + exit({abnormal_dependent_exit, ChSupPid, Reason}); + Channel -> + maybe_close(handle_exception(State, Channel, Reason)) + end end. -channel_cleanup(Pid) -> - case get({chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> erase({channel, Channel}), - erase({chpid, Pid}), - Channel +channel_cleanup(ChSupPid) -> + case get({ch_sup_pid, ChSupPid}) of + undefined -> undefined; + {{channel, Channel}, ChFr} -> erase({channel, Channel}), + erase(ChFr), + erase({ch_sup_pid, ChSupPid}), + Channel end. -all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. +all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, + {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + NChannels = + length([rabbit_framing_channel:shutdown(ChFrPid) + || ChFrPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -487,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', Pid, Reason} -> - case channel_cleanup(Pid) of + {'EXIT', ChSupPid, Reason} -> + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> - case Reason of - normal -> ok; - _ -> + case termination_kind(Reason) of + controlled -> + ok; + uncontrolled -> rabbit_log:error( "connection ~p, channel ~p - " "error while terminating:~n~p~n", @@ -519,6 +522,11 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. +termination_kind(normal) -> controlled; +termination_kind(shutdown) -> controlled; +termination_kind({shutdown, _Term}) -> controlled; +termination_kind(_) -> uncontrolled. + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) @@ -548,8 +556,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of - {chpid, ChPid} -> - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), + {ch_fr_pid, ChFrPid} -> + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), @@ -732,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - sock = Sock}) -> + sock = Sock, + start_heartbeat_fun = SHF}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", @@ -742,8 +751,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = rabbit_heartbeat:start_heartbeat( - Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -761,9 +769,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - State1 = State#v1{connection_state = running, - connection = NewConnection}, + State1 = internal_conserve_memory( + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State#v1{connection_state = running, + connection = NewConnection}), rabbit_event:notify( connection_created, [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), @@ -848,21 +857,21 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> - #v1{sock = Sock, connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost, - protocol = Protocol}} = State, - {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), - {ok, ChPid} = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector], - Protocol), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). +send_to_new_channel(Channel, AnalyzedFrame, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + {ok, ChSupPid, ChFrPid} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector}), + put({channel, Channel}, {ch_fr_pid, ChFrPid}), + put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), + put({ch_fr_pid, ChFrPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c07055af40..d783c43e46 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,9 +35,6 @@ -export([all_tests/0, test_parsing/0]). -%% Exported so the hook mechanism can call back --export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). - -import(lists). -include("rabbit.hrl"). @@ -55,6 +52,8 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + ok = file_handle_cache:set_limit(10), + passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), @@ -1020,7 +1019,8 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self()), + <<"user">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1061,67 +1061,23 @@ test_server_status() -> %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + + unlink(Ch), ok = rabbit_channel:shutdown(Ch), passed. -test_hooks() -> - %% Firing of hooks calls all hooks in an isolated manner - rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}), - rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}), - rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}), - rabbit_hooks:trigger(test_hook, [arg1, arg2]), - [arg1, arg2] = get(test_hook_test_fired), - [arg1, arg2] = get(test_hook_test2_fired), - undefined = get(test_hook2_test2_fired), - - %% Hook Deletion works - put(test_hook_test_fired, undefined), - put(test_hook_test2_fired, undefined), - rabbit_hooks:unsubscribe(test_hook, test), - rabbit_hooks:trigger(test_hook, [arg3, arg4]), - undefined = get(test_hook_test_fired), - [arg3, arg4] = get(test_hook_test2_fired), - undefined = get(test_hook2_test2_fired), - - %% Catches exceptions from bad hooks - rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}), - ok = rabbit_hooks:trigger(test_hook3, []), - - %% Passing extra arguments to hooks - rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}), - rabbit_hooks:trigger(arg_hook, [arg1, arg2]), - {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), - - %% Invoking Pids - Remote = fun () -> - receive - {rabbitmq_hook,[remote_test,test,[],Target]} -> - Target ! invoked - end - end, - P = spawn(Remote), - rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), - rabbit_hooks:trigger(remote_test, []), - receive - invoked -> ok - after 100 -> - io:format("Remote hook not invoked"), - throw(timeout) - end, - passed. - test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, - <<"/">>, self()), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, + <<"guest">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), - MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, - {Writer, Ch, MRef}. + {Writer, Ch}. test_statistics_receiver(Pid) -> receive @@ -1160,7 +1116,7 @@ test_statistics() -> %% by far the most complex code though. %% Set up a channel and queue - {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1), rabbit_channel:do(Ch, #'queue.declare'{}), QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 @@ -1404,17 +1360,35 @@ delete_log_handlers(Handlers) -> Handler <- Handlers], ok. -handle_hook(HookName, Handler, Args) -> - A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired", - put(list_to_atom(A), Args). -bad_handle_hook(_, _, _) -> - exit(bad_handle_hook_called). -extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> - handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). - test_supervisor_delayed_restart() -> test_sup:test_supervisor_delayed_restart(). +test_file_handle_cache() -> + %% test copying when there is just one spare handle + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores + TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), + ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), + Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( + filename:join(TmpDir, "file3"), + [write], []), + receive close -> ok end, + file_handle_cache:delete(Hdl) + end), + Src = filename:join(TmpDir, "file1"), + Dst = filename:join(TmpDir, "file2"), + Content = <<"foo">>, + ok = file:write_file(Src, Content), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), + {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), + Size = size(Content), + {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), + ok = file_handle_cache:delete(SrcHdl), + file_handle_cache:delete(DstHdl), + Pid ! close, + ok = file_handle_cache:set_limit(Limit), + passed. + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> @@ -1624,7 +1598,8 @@ init_test_queue() -> test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end). + end, + fun (_Guids) -> ok end). restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index a9313503a1..47e8bb0161 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -40,8 +40,8 @@ unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, - user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1, - channel_exit/0, connection_exit/0]). + user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, + ok_pid_or_error/0, channel_exit/0, connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -147,5 +147,6 @@ -type(error(A) :: {'error', A}). -type(ok_or_error(A) :: 'ok' | error(A)). -type(ok_or_error2(A, B) :: ok(A) | error(B)). +-type(ok_pid_or_error() :: ok_or_error2(pid(), any())). -endif. % use_specs diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f90ee734a8..fd5b5ba5f0 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, start_link/4, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, send_command_and_signal_back/3, - send_command_and_signal_back/4, send_command_and_notify/5]). +-export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([send_command/2, send_command/3, send_command_sync/2, + send_command_sync/3, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). @@ -48,24 +48,23 @@ -ifdef(use_specs). --spec(start/4 :: +-spec(start/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(start_link/4 :: +-spec(start_link/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(send_command_and_signal_back/3 :: - (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok'). --spec(send_command_and_signal_back/4 :: - (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid()) - -> 'ok'). +-spec(send_command_sync/2 :: + (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command_sync/3 :: + (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) @@ -84,23 +83,35 @@ %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}])}. - -start_link(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, +start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -mainloop(State) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. + +mainloop(ReaderPid, State) -> + try + mainloop1(ReaderPid, State) + catch + exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + end, + done. + +mainloop1(ReaderPid, State) -> receive - Message -> ?MODULE:mainloop(handle_message(Message, State)) + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. handle_message({send_command, MethodRecord}, @@ -116,20 +127,20 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, Protocol), State; -handle_message({send_command_and_signal_back, MethodRecord, Parent}, +handle_message({send_command_sync, From, MethodRecord}, State = #wstate{sock = Sock, channel = Channel, protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), - Parent ! rabbit_writer_send_command_signal, + gen_server:reply(From, ok), State; -handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, +handle_message({send_command_sync, From, {MethodRecord, Content}}, State = #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, Protocol), - Parent ! rabbit_writer_send_command_signal, + gen_server:reply(From, ok), State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, @@ -144,8 +155,6 @@ handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(shutdown, _State) -> - exit(normal); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -159,22 +168,21 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. -send_command_and_signal_back(W, MethodRecord, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Parent}, - ok. +send_command_sync(W, MethodRecord) -> + call(W, send_command_sync, MethodRecord). -send_command_and_signal_back(W, MethodRecord, Content, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, - ok. +send_command_sync(W, MethodRecord, Content) -> + call(W, send_command_sync, {MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -shutdown(W) -> - W ! shutdown, - rabbit_misc:unlink_and_capture_exit(W), - ok. +%--------------------------------------------------------------------------- + +call(Pid, Label, Msg) -> + {ok, Res} = gen:call(Pid, Label, Msg, infinity), + Res. %--------------------------------------------------------------------------- diff --git a/src/supervisor2.erl b/src/supervisor2.erl index ecb2655f60..4a1c5832b3 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -31,6 +31,11 @@ %% the MaxT and MaxR parameters to permit the child to be %% restarted. This may require waiting for longer than Delay. %% +%% 4) Added an 'intrinsic' restart type. Like the transient type, this +%% type means the child should only be restarted if the child exits +%% abnormally. Unlike the transient type, if the child exits +%% normally, the supervisor itself also exits normally. +%% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% %% %CopyrightBegin% @@ -58,7 +63,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, + which_children/1, find_child/2, check_childspecs/1]). -export([behaviour_info/1]). @@ -133,6 +138,10 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). +find_child(Supervisor, Name) -> + [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), + Name1 =:= Name]. + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). @@ -534,13 +543,16 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); +do_restart(intrinsic, normal, Child, State) -> + {shutdown, state_del_child(Child, State)}; do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; do_restart(_, shutdown, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; -do_restart(transient, Reason, Child, State) -> +do_restart(Type, Reason, Child, State) when Type =:= transient orelse + Type =:= intrinsic -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); do_restart(temporary, Reason, Child, State) -> @@ -641,14 +653,22 @@ terminate_simple_children(Child, Dynamics, SupName) -> ok. do_terminate(Child, SupName) when Child#child.pid =/= undefined -> - case shutdown(Child#child.pid, - Child#child.shutdown) of - ok -> - Child#child{pid = undefined}; - {error, OtherReason} -> - report_error(shutdown_error, OtherReason, Child, SupName), - Child#child{pid = undefined} - end; + ReportError = fun (Reason) -> + report_error(shutdown_error, Reason, Child, SupName) + end, + case shutdown(Child#child.pid, Child#child.shutdown) of + ok -> + ok; + {error, normal} -> + case Child#child.restart_type of + permanent -> ReportError(normal); + {permanent, _Delay} -> ReportError(normal); + _ -> ok + end; + {error, OtherReason} -> + ReportError(OtherReason) + end, + Child#child{pid = undefined}; do_terminate(Child, _SupName) -> Child. @@ -834,7 +854,7 @@ supname(N,_) -> N. %%% where Name is an atom %%% Func is {Mod, Fun, Args} == {atom, atom, list} %%% RestartType is permanent | temporary | transient | -%%% {permanent, Delay} | +%%% intrinsic | {permanent, Delay} | %%% {transient, Delay} where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker @@ -884,6 +904,7 @@ validFunc(Func) -> throw({invalid_mfa, Func}). validRestartType(permanent) -> true; validRestartType(temporary) -> true; validRestartType(transient) -> true; +validRestartType(intrinsic) -> true; validRestartType({permanent, Delay}) -> validDelay(Delay); validRestartType({transient, Delay}) -> validDelay(Delay); validRestartType(RestartType) -> throw({invalid_restart_type, diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index cc4982c9cb..c9809ace61 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,6 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> + ok = file_handle_cache:obtain(), accept(State); handle_cast(_Msg, State) -> @@ -83,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain() catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -92,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% accept more accept(State); + handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we %% know this will fail. {stop, normal, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -111,7 +115,6 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> - ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl index 1b78584384..02d7e0e40d 100644 --- a/src/tcp_client_sup.erl +++ b/src/tcp_client_sup.erl @@ -31,19 +31,19 @@ -module(tcp_client_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/1, start_link/2]). -export([init/1]). start_link(Callback) -> - supervisor:start_link(?MODULE, Callback). + supervisor2:start_link(?MODULE, Callback). start_link(SupName, Callback) -> - supervisor:start_link(SupName, ?MODULE, Callback). + supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{tcp_client, {M,F,A}, - temporary, brutal_kill, worker, [M]}]}}. + temporary, infinity, supervisor, [M]}]}}. diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 3cbc80f819..e658f005a3 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -72,10 +72,7 @@ -ifdef(use_specs). --spec(start_link/1 :: - (float()) -> 'ignore' | - rabbit_types:error(any()) | - rabbit_types:ok(pid())). +-spec(start_link/1 :: (float()) -> {'ok', pid()} | {'error', any()}). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> non_neg_integer()). diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 01ce3535d8..595884e033 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -52,7 +52,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). -spec(submit_async/1 :: (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index afa21164be..177a14533b 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -41,9 +41,8 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). --spec(start_link/1 :: (non_neg_integer()) -> - 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -endif. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index a61e4cc372..42049d5068 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -44,8 +44,7 @@ -ifdef(use_specs). --spec(start_link/1 :: - (any()) -> {'ok', pid()} | 'ignore' | rabbit_types:error(any())). +-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). |
