diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-01 15:59:47 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-01 15:59:47 +0100 |
| commit | d39b2bb892ac9b4f92e27c9aa1ccb11276ec35fa (patch) | |
| tree | 79688178fb289e88a08dc948964e9d7dc23453a6 | |
| parent | 03084f29639c7889b6c4d03c4290a9e66f81eb29 (diff) | |
| parent | e62a381d53e320cb4778cfac94be18f38fb51c8f (diff) | |
| download | rabbitmq-server-git-d39b2bb892ac9b4f92e27c9aa1ccb11276ec35fa.tar.gz | |
merge default
| -rw-r--r-- | Makefile | 16 | ||||
| -rwxr-xr-x | check_xref | 291 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
| -rw-r--r-- | include/rabbit.hrl | 3 | ||||
| -rw-r--r-- | packaging/windows-exe/rabbitmq_nsi.in | 6 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 27 | ||||
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 81 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 128 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 31 |
21 files changed, 670 insertions, 224 deletions
@@ -103,7 +103,7 @@ endif all: $(TARGETS) -.PHONY: plugins +.PHONY: plugins check-xref ifneq "$(PLUGINS_SRC_DIR)" "" plugins: [ -d "$(PLUGINS_SRC_DIR)/rabbitmq-server" ] || ln -s "$(CURDIR)" "$(PLUGINS_SRC_DIR)/rabbitmq-server" @@ -111,9 +111,19 @@ plugins: PLUGINS_SRC_DIR="" $(MAKE) -C "$(PLUGINS_SRC_DIR)" plugins-dist PLUGINS_DIST_DIR="$(CURDIR)/$(PLUGINS_DIR)" VERSION=$(VERSION) echo "Put your EZs here and use rabbitmq-plugins to enable them." > $(PLUGINS_DIR)/README rm -f $(PLUGINS_DIR)/rabbit_common*.ez + +# add -q to remove printout of warnings.... +check-xref: $(BEAM_TARGETS) $(PLUGINS_DIR) + rm -rf lib + ./check_xref $(PLUGINS_DIR) -q + else plugins: # Not building plugins + +check-xref: + $(info xref checks are disabled) + endif $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -217,11 +227,11 @@ stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) set-resource-alarm: all - echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ + echo "rabbit_alarm:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ $(ERL_CALL) clear-resource-alarm: all - echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \ + echo "rabbit_alarm:clear_alarm({resource_limit, $(SOURCE), node()})." | \ $(ERL_CALL) stop-node: diff --git a/check_xref b/check_xref new file mode 100755 index 0000000000..8f65f3b12b --- /dev/null +++ b/check_xref @@ -0,0 +1,291 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +-mode(compile). + +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% + +main(["-h"]) -> + io:format("usage: check_xref PluginDirectory (options)~n" + "options:~n" + " -q - quiet mode (only prints errors)~n" + " -X - disables all filters~n"); +main([PluginsDir|Argv]) -> + put({?MODULE, quiet}, lists:member("-q", Argv)), + put({?MODULE, no_filters}, lists:member("-X", Argv)), + + {ok, Cwd} = file:get_cwd(), + code:add_pathz(filename:join(Cwd, "ebin")), + LibDir = filename:join(Cwd, "lib"), + case filelib:is_dir(LibDir) of + false -> ok; + true -> os:cmd("rm -rf " ++ LibDir) + end, + Rc = try + check(Cwd, PluginsDir, LibDir, checks()) + catch + _:Err -> + io:format(user, "failed: ~p~n", [Err]), + 1 + end, + shutdown(Rc, LibDir). + +shutdown(Rc, LibDir) -> + os:cmd("rm -rf " ++ LibDir), + erlang:halt(Rc). + +check(Cwd, PluginsDir, LibDir, Checks) -> + {ok, Plugins} = file:list_dir(PluginsDir), + ok = file:make_dir(LibDir), + [begin + Source = filename:join(PluginsDir, Plugin), + Target = filename:join(LibDir, Plugin), + IsExternal = external_dependency(Plugin), + AppN = case IsExternal of + true -> filename:join(LibDir, unmangle_name(Plugin)); + false -> filename:join( + LibDir, filename:basename(Plugin, ".ez")) + end, + + report(info, "mkdir -p ~s~n", [Target]), + filelib:ensure_dir(Target), + + report(info, "cp ~s ~s~n", [Source, Target]), + {ok, _} = file:copy(Source, Target), + + report(info, "unzip -d ~s ~s~n", [LibDir, Target]), + {ok, _} = zip:unzip(Target, [{cwd, LibDir}]), + + UnpackDir = filename:join(LibDir, filename:basename(Target, ".ez")), + report(info, "mv ~s ~s~n", [UnpackDir, AppN]), + ok = file:rename(UnpackDir, AppN), + + code:add_patha(filename:join(AppN, "ebin")), + case IsExternal of + true -> App = list_to_atom(hd(string:tokens(filename:basename(AppN), + "-"))), + report(info, "loading ~p~n", [App]), + application:load(App), + store_third_party(App); + _ -> ok + end + end || Plugin <- Plugins, + lists:suffix(".ez", Plugin)], + + RabbitAppEbin = filename:join([LibDir, "rabbit", "ebin"]), + filelib:ensure_dir(filename:join(RabbitAppEbin, "foo")), + {ok, Beams} = file:list_dir("ebin"), + [{ok, _} = file:copy(filename:join("ebin", Beam), + filename:join(RabbitAppEbin, Beam)) || Beam <- Beams], + xref:start(?MODULE), + xref:set_default(?MODULE, [{verbose, false}, {warnings, false}]), + xref:set_library_path(?MODULE, code:get_path()), + xref:add_release(?MODULE, Cwd, {name, rabbit}), + store_unresolved_calls(), + Results = lists:flatten([perform_analysis(Q) || Q <- Checks]), + report(Results). + +%% +%% Analysis +%% + +perform_analysis({Query, Description, Severity}) -> + perform_analysis({Query, Description, Severity, fun(_) -> false end}); +perform_analysis({Query, Description, Severity, Filter}) -> + report_progress("Checking whether any code ~s " + "(~s)~n", [Description, Query]), + case analyse(Query) of + {ok, Analysis} -> + [filter(Result, Filter) || + Result <- process_analysis(Query, Description, + Severity, Analysis)]; + {error, Module, Reason} -> + {analysis_error, {Module, Reason}} + end. + +partition(Results) -> + lists:partition(fun({{_, L}, _}) -> L =:= error end, Results). + +analyse(Query) when is_atom(Query) -> + xref:analyse(?MODULE, Query, [{verbose, false}]); +analyse(Query) when is_list(Query) -> + xref:q(?MODULE, Query). + +process_analysis(Query, Tag, Severity, Analysis) when is_atom(Query) -> + [{{Tag, Severity}, MFA} || MFA <- Analysis]; +process_analysis(Query, Tag, Severity, Analysis) when is_list(Query) -> + [{{Tag, Severity}, Result} || Result <- Analysis]. + +checks() -> + [{"(XXL)(Lin) ((XC - UC) || (XU - X - B))", + "has call to undefined function(s)", + error, filters()}, + {"(Lin) (L - LU)", "has unused local function(s)", + error, filters()}, + {"(Lin) (LU * (X - XU))", + "has exported function(s) only used locally", + warning, filters()}, + {"(Lin) (DF * (XU + LU))", "used deprecated function(s)", + warning, filters()}]. +% {"(Lin) (X - XU)", "possibly unused export", +% warning, fun filter_unused/1}]. + +%% +%% noise filters (can be disabled with -X) - strip uninteresting analyses +%% + +filter(Result, Filter) -> + case Filter(Result) of + false -> Result; + true -> [] %% NB: this gets flattened out later on.... + end. + +filters() -> + case get({?MODULE, no_filters}) of + true -> fun(_) -> false end; + _ -> filter_chain([fun is_unresolved_call/1, fun is_callback/1, + fun is_unused/1, fun is_irrelevant/1]) + end. + +filter_chain(FnChain) -> + fun(AnalysisResult) -> + lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult)); + (_F, true) -> true + end, false, FnChain) + end. + +cleanup({{_, _},{{{{_,_,_}=MFA1,_},{{_,_,_}=MFA2,_}},_}}) -> {MFA1, MFA2}; +cleanup({{_, _},{{{_,_,_}=MFA1,_},{{_,_,_}=MFA2,_}}}) -> {MFA1, MFA2}; +cleanup({{_, _},{{_,_,_}=MFA1,{_,_,_}=MFA2},_}) -> {MFA1, MFA2}; +cleanup({{_, _},{{_,_,_}=MFA1,{_,_,_}=MFA2}}) -> {MFA1, MFA2}; +cleanup({{_, _}, {_,_,_}=MFA}) -> MFA; +cleanup({{_, _}, {{_,_,_}=MFA,_}}) -> MFA; +cleanup({{_,_,_}=MFA, {_,_,_}}) -> MFA; +cleanup({{_,_,_}=MFA, {_,_,_},_}) -> MFA; +cleanup(Other) -> Other. + +is_irrelevant({{M,_,_}, {_,_,_}}) -> + is_irrelevant(M); +is_irrelevant({M,_,_}) -> + is_irrelevant(M); +is_irrelevant(Mod) when is_atom(Mod) -> + lists:member(Mod, get({?MODULE, third_party})). + +is_unused({{_,_,_}=MFA, {_,_,_}}) -> + is_unused(MFA); +is_unused({M,_F,_A}) -> + lists:suffix("_tests", atom_to_list(M)); +is_unused(_) -> + false. + +is_unresolved_call({_, F, A}) -> + UC = get({?MODULE, unresolved_calls}), + sets:is_element({'$M_EXPR', F, A}, UC); +is_unresolved_call(_) -> + false. + +%% TODO: cache this.... +is_callback({M,_,_}=MFA) -> + Attributes = M:module_info(attributes), + Behaviours = proplists:append_values(behaviour, Attributes), + {_, Callbacks} = lists:foldl(fun acc_behaviours/2, {M, []}, Behaviours), + lists:member(MFA, Callbacks); +is_callback(_) -> + false. + +acc_behaviours(B, {M, CB}=Acc) -> + case catch(B:behaviour_info(callbacks)) of + [{_,_} | _] = Callbacks -> + {M, CB ++ [{M, F, A} || {F,A} <- Callbacks]}; + _ -> + Acc + end. + +%% +%% reporting/output +%% + +report(Results) -> + [report_failures(F) || F <- Results], + {Errors, Warnings} = partition(Results), + report(info, "Completed: ~p errors, ~p warnings~n", + [length(Errors), length(Warnings)]), + case length(Errors) > 0 of + true -> 1; + false -> 0 + end. + +report_failures({analysis_error, {Mod, Reason}}) -> + report(error, "~s:0 Analysis Error: ~p~n", [source_file(Mod), Reason]); +report_failures({{Tag, Level}, {{{{M,_,_},L},{{M2,F2,A2},_}},_}}) -> + report(Level, "~s:~w ~s ~p:~p/~p~n", + [source_file(M), L, Tag, M2, F2, A2]); +report_failures({{Tag, Level}, {{M,F,A},L}}) -> + report(Level, "~s:~w ~s ~p:~p/~p~n", [source_file(M), L, Tag, M, F, A]); +report_failures({{Tag, Level}, {M,F,A}}) -> + report(Level, "~s:unknown ~s ~p:~p/~p~n", [source_file(M), Tag, M, F, A]); +report_failures(Term) -> + report(error, "Ignoring ~p~n", [Term]), + ok. + +report_progress(Fmt, Args) -> + report(info, Fmt, Args). + +report(Level, Fmt, Args) -> + case {get({?MODULE, quiet}), Level} of + {true, error} -> do_report(lookup_prefix(Level), Fmt, Args); + {false, _} -> do_report(lookup_prefix(Level), Fmt, Args); + _ -> ok + end. + +do_report(Prefix, Fmt, Args) -> + io:format(Prefix ++ Fmt, Args). + +lookup_prefix(error) -> "ERROR: "; +lookup_prefix(warning) -> "WARNING: "; +lookup_prefix(info) -> "INFO: ". + +source_file(M) -> + proplists:get_value(source, M:module_info(compile)). + +%% +%% setup/code-path/file-system ops +%% + +store_third_party(App) -> + {ok, AppConfig} = application:get_all_key(App), + case get({?MODULE, third_party}) of + undefined -> + put({?MODULE, third_party}, + proplists:get_value(modules, AppConfig)); + Modules -> + put({?MODULE, third_party}, + proplists:get_value(modules, AppConfig) ++ Modules) + end. + +%% TODO: this ought not to be maintained in such a fashion +external_dependency(Path) -> + lists:any(fun(P) -> lists:prefix(P, Path) end, + ["mochiweb", "webmachine", "rfc4627", "eldap"]). + +unmangle_name(Path) -> + [Name, Vsn | _] = re:split(Path, "-", [{return, list}]), + string:join([Name, Vsn], "-"). + +store_unresolved_calls() -> + {ok, UCFull} = analyse("UC"), + UC = [MFA || {_, {_,_,_} = MFA} <- UCFull], + put({?MODULE, unresolved_calls}, sets:from_list(UC)). diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 65308f8ee3..7884228169 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -25,6 +25,7 @@ %% 0 ("no limit") would make a better default, but that %% breaks the QPid Java client {frame_max, 131072}, + {heartbeat, 600}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 262144}, {default_user, <<"guest">>}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e8b4a6232e..d6fac46db7 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,8 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes, policy}). + arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, + policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 91510991cc..f5257040ef 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -101,7 +101,9 @@ Section "RabbitMQ Service" RabbitService ExpandEnvStrings $0 %COMSPEC% ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install' ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start' - CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie" + ReadEnvStr $1 "HOMEDRIVE" + ReadEnvStr $2 "HOMEPATH" + CopyFiles "$WINDIR\.erlang.cookie" "$1$2\.erlang.cookie" SectionEnd ;-------------------------------- @@ -234,4 +236,4 @@ Function findErlang System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0' ${EndIf} -FunctionEnd
\ No newline at end of file +FunctionEnd diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 13ee42499a..68c095d2d5 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -150,8 +150,8 @@ info/0, info/1]). -export([ulimit/0]). --export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). +-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -195,7 +195,9 @@ obtain_count, obtain_pending, clients, - timer_ref + timer_ref, + alarm_set, + alarm_clear }). -record(cstate, @@ -268,7 +270,11 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + start_link(fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1). + +start_link(AlarmSet, AlarmClear) -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear], + [{timeout, infinity}]). register_callback(M, F, A) when is_atom(M) andalso is_atom(F) andalso is_list(A) -> @@ -806,7 +812,7 @@ i(Item, _) -> throw({bad_argument, Item}). %% gen_server2 callbacks %%---------------------------------------------------------------------------- -init([]) -> +init([AlarmSet, AlarmClear]) -> Limit = case application:get_env(file_handles_high_watermark) of {ok, Watermark} when (is_integer(Watermark) andalso Watermark > 0) -> @@ -830,7 +836,9 @@ init([]) -> obtain_count = 0, obtain_pending = pending_new(), clients = Clients, - timer_ref = undefined }}. + timer_ref = undefined, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }}. prioritise_cast(Msg, _State) -> case Msg of @@ -1026,10 +1034,11 @@ obtain_limit_reached(#fhc_state { obtain_limit = Limit, obtain_count = Count}) -> Limit =/= infinity andalso Count >= Limit. -adjust_alarm(OldState, NewState) -> +adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, + alarm_clear = AlarmClear }, NewState) -> case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of - {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []}); - {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit); + {false, true} -> AlarmSet({file_descriptor_limit, []}); + {true, false} -> AlarmClear(file_descriptor_limit); _ -> ok end, NewState. diff --git a/src/rabbit.erl b/src/rabbit.erl index 23b422af33..f7953c55d1 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -20,7 +20,8 @@ -export([start/0, boot/0, stop/0, stop_and_halt/0, await_startup/0, status/0, is_running/0, - is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). + is_running/1, environment/0, rotate_logs/1, force_event_refresh/0, + start_fhc/0]). -export([start/2, stop/1]). @@ -53,8 +54,7 @@ -rabbit_boot_step({file_handle_cache, [{description, "file handle cache server"}, - {mfa, {rabbit_sup, start_restartable_child, - [file_handle_cache]}}, + {mfa, {rabbit, start_fhc, []}}, {requires, pre_boot}, {enables, worker_pool}]}). @@ -731,3 +731,10 @@ config_files() -> [File] <- Files]; error -> [] end. + +%% We don't want this in fhc since it references rabbit stuff. And we can't put +%% this in the bootstep directly. +start_fhc() -> + rabbit_sup:start_restartable_child( + file_handle_cache, + [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d16d90a45d..e6625b2b90 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -18,22 +18,28 @@ -behaviour(gen_event). --export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]). +-export([start_link/0, start/0, stop/0, register/2, set_alarm/1, + clear_alarm/1, get_alarms/0, on_node_up/1, on_node_down/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). -export([remote_conserve_resources/3]). %% Internal use only --record(alarms, {alertees, alarmed_nodes}). +-define(SERVER, ?MODULE). + +-record(alarms, {alertees, alarmed_nodes, alarms}). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). +-spec(set_alarm/1 :: (any()) -> 'ok'). +-spec(clear_alarm/1 :: (any()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). @@ -41,59 +47,70 @@ %%---------------------------------------------------------------------------- +start_link() -> + gen_event:start_link({local, ?SERVER}). + start() -> - ok = alarm_handler:add_alarm_handler(?MODULE, []), + ok = rabbit_sup:start_restartable_child(?MODULE), + ok = gen_event:add_handler(?SERVER, ?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), - rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), - + rabbit_sup:start_restartable_child( + vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1, + fun rabbit_alarm:clear_alarm/1]), {ok, DiskLimit} = application:get_env(disk_free_limit), rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. -stop() -> - ok = alarm_handler:delete_alarm_handler(?MODULE). +stop() -> ok. register(Pid, HighMemMFA) -> - gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, + gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA}, infinity). -on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}). +set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}). +clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}). + +get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity). -on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). +on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}). +on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}). -%% Can't use alarm_handler:{set,clear}_alarm because that doesn't -%% permit notifying a remote node. remote_conserve_resources(Pid, Source, true) -> - gen_event:notify({alarm_handler, node(Pid)}, + gen_event:notify({?SERVER, node(Pid)}, {set_alarm, {{resource_limit, Source, node()}, []}}); remote_conserve_resources(Pid, Source, false) -> - gen_event:notify({alarm_handler, node(Pid)}, + gen_event:notify({?SERVER, node(Pid)}, {clear_alarm, {resource_limit, Source, node()}}). + %%---------------------------------------------------------------------------- init([]) -> {ok, #alarms{alertees = dict:new(), - alarmed_nodes = dict:new()}}. + alarmed_nodes = dict:new(), + alarms = []}}. handle_call({register, Pid, HighMemMFA}, State) -> {ok, 0 < dict:size(State#alarms.alarmed_nodes), internal_register(Pid, HighMemMFA, State)}; +handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> + {ok, Alarms, State}; + handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> - {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; +handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> + handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]}); -handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> - {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; +handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> + handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1, + Alarms)}); handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. ok = gen_event:notify( - {alarm_handler, Node}, + {?SERVER, Node}, {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; @@ -186,3 +203,25 @@ internal_register(Pid, {M, F, A} = HighMemMFA, end, NewAlertees = dict:store(Pid, HighMemMFA, Alertees), State#alarms{alertees = NewAlertees}. + +handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> + rabbit_log:warning("~s resource limit alarm set on node ~p~n", + [Source, Node]), + {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; +handle_set_alarm({file_descriptor_limit, []}, State) -> + rabbit_log:warning("file descriptor limit alarm set~n"), + {ok, State}; +handle_set_alarm(Alarm, State) -> + rabbit_log:warning("alarm '~p' set~n", [Alarm]), + {ok, State}. + +handle_clear_alarm({resource_limit, Source, Node}, State) -> + rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", + [Source, Node]), + {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; +handle_clear_alarm(file_descriptor_limit, State) -> + rabbit_log:warning("file descriptor limit alarm cleared~n"), + {ok, State}; +handle_clear_alarm(Alarm, State) -> + rabbit_log:warning("alarm '~p' cleared~n", [Alarm]), + {ok, State}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index afbaea651b..a5f227bc19 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0]). +-export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). @@ -102,6 +102,7 @@ -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). +-spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -215,6 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], + sync_slave_pids = [], mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); @@ -475,6 +477,8 @@ force_event_refresh(QNames) -> force_event_refresh(Failed) end. +wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). + consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). @@ -599,7 +603,7 @@ on_node_down(Node) -> slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node andalso - not is_process_alive(Pid)])), + not rabbit_misc:is_process_alive(Pid)])), {Qs, Dels} = lists:unzip(QsDels), T = rabbit_binding:process_deletions( lists:foldl(fun rabbit_binding:combine_deletions/2, @@ -672,13 +676,18 @@ qpids(Qs) -> lists:append([[QPid | SPids] || #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). safe_delegate_call_ok(F, Pids) -> - case delegate:invoke(Pids, fun (Pid) -> - rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> F(Pid) end) - end) of - {_, []} -> ok; - {_, Bad} -> {error, Bad} + {_, Bads} = delegate:invoke(Pids, fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> ok end, + fun () -> F(Pid) end) + end), + case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) -> + rabbit_misc:is_abnormal_exit(R); + ({_Pid, _}) -> + false + end, Bads) of + [] -> ok; + Bads1 -> {error, Bads1} end. delegate_call(Pid, Msg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8933de8746..b4071627ce 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -93,6 +93,7 @@ consumers, memory, slave_pids, + synchronised_slave_pids, backing_queue_status ]). @@ -102,9 +103,7 @@ durable, auto_delete, arguments, - owner_pid, - slave_pids, - synchronised_slave_pids + owner_pid ]). -define(INFO_KEYS, @@ -788,7 +787,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> case pmon:is_monitored(QPid, QMons) of false -> noreply(State); - true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> case rabbit_misc:is_abnormal_exit(Reason) of true -> {Lost, _UC1} = dtree:take_all(QPid, UC), QNameS = rabbit_misc:rs(qname(State)), rabbit_log:warning("DLQ ~p for ~s died with " @@ -893,37 +892,7 @@ make_dead_letter_msg(Reason, now_micros() -> timer:now_diff(now(), {0,0,0}). -infos(Items, State) -> - {Prefix, Items1} = - case lists:member(synchronised_slave_pids, Items) of - true -> Prefix1 = slaves_status(State), - case lists:member(slave_pids, Items) of - true -> {Prefix1, Items -- [slave_pids]}; - false -> {proplists:delete(slave_pids, Prefix1), Items} - end; - false -> {[], Items} - end, - Prefix ++ [{Item, i(Item, State)} - || Item <- (Items1 -- [synchronised_slave_pids])]. - -slaves_status(#q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> - [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - {ok, #amqqueue{slave_pids = SPids}} -> - {Results, _Bad} = - delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), - {SPids1, SSPids} = - lists:foldl( - fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> - {[Pid | SPidsN], - case proplists:get_bool(is_synchronised, Infos) of - true -> [Pid | SSPidsN]; - false -> SSPidsN - end} - end, {[], []}, Results), - [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}] - end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; @@ -957,9 +926,14 @@ i(memory, _) -> M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{mirror_nodes = undefined}} -> ''; {ok, #amqqueue{slave_pids = SPids}} -> SPids end; +i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> ''; + {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids + end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1307,7 +1281,10 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State); handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> - dead_letter_msg(Msg, AckTag, Reason, State). + dead_letter_msg(Msg, AckTag, Reason, State); + +handle_cast(wake_up, State) -> + noreply(State). %% We need to not ignore this as we need to remove outstanding %% confirms due to queue death. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 864e100afc..69fe0edca3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -405,17 +405,17 @@ handle_exception(Reason, State = #ch{protocol = Protocol, writer_pid = WriterPid, reader_pid = ReaderPid, conn_pid = ConnPid}) -> - {CloseChannel, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [ConnPid, Channel, Reason]), %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), - case CloseChannel of - Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), - {noreply, State1}; - _ -> ReaderPid ! {channel_exit, Channel, Reason}, - {stop, normal, State1} + case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of + {Channel, CloseMethod} -> + rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n", + [ConnPid, Channel, Reason]), + ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + {0, _} -> + ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} end. precondition_failed(Format) -> precondition_failed(Format, []). @@ -1116,7 +1116,7 @@ monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, delivering_queues = sets:add_element(QPid, DQ)}. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> - case rabbit_misc:is_abnormal_termination(Reason) of + case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), send_nacks(MXs, State#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index ebfc563691..0b1902509d 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -196,11 +196,11 @@ print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) -> print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg). print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> - case Results = rpc_call(Node, Module, InfoFun, VHostArg) of - [_|_] -> InfoItems = rpc_call(Node, Module, KeysFun, []), - display_row([atom_to_list(I) || I <- InfoItems]), - display_info_list(Results, InfoItems); - _ -> ok + case rpc_call(Node, Module, InfoFun, VHostArg) of + [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []), + display_row([atom_to_list(I) || I <- InfoItems]), + display_info_list(Results, InfoItems); + _ -> ok end, io:nl(). @@ -452,14 +452,13 @@ action(list_parameters, Node, Args = [], _Opts, Inform) -> rabbit_runtime_parameters:info_keys()); action(report, Node, _Args, _Opts, Inform) -> - io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]), + Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []), Action <- [status, cluster_status, environment]], VHosts = unsafe_rpc(Node, rabbit_vhost, list, []), [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], - io:format("End of server status report~n"), ok; action(eval, Node, [Expr], _Opts, _Inform) -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index c07ad832f0..c87b1dc1ec 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -40,7 +40,6 @@ (rabbit_channel:channel_number(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). - -spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok'). -endif. diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 58375abb45..e72181c061 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -149,10 +149,10 @@ internal_update(State = #state { limit = Limit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info("exceeded", CurrentFreeBytes, LimitBytes), - alarm_handler:set_alarm({{resource_limit, disk, node()}, []}); + rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> emit_update_info("below limit", CurrentFreeBytes, LimitBytes), - alarm_handler:clear_alarm({resource_limit, disk, node()}); + rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok end, diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index e6470b721e..9a793aab09 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -54,13 +54,13 @@ %% called when comparing exchanges for equivalence - should return ok or %% exit with #amqp_error{} --callback assert_args_equivalence (rabbit_types:exchange(), - rabbit_framing:amqp_table()) -> +-callback assert_args_equivalence(rabbit_types:exchange(), + rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit(). %% called when the policy attached to this exchange changes. --callback policy_changed ( - serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. +-callback policy_changed(serial(), rabbit_types:exchange(), + rabbit_types:exchange()) -> 'ok'. -else. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe55..29e2d29f5c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,7 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4]). + report_deaths/4, store_updated_slaves/1]). -include("rabbit.hrl"). @@ -37,6 +37,8 @@ -spec(add_mirror/3 :: (rabbit_types:vhost(), binary(), atom()) -> rabbit_types:ok_or_error(any())). +-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> + rabbit_types:amqqueue()). -endif. @@ -58,11 +60,13 @@ remove_from_queue(QueueName, DeadPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids }] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + not lists:member(node(Pid), + DeadNodes) orelse + rabbit_misc:is_process_alive(Pid)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, []}; @@ -70,9 +74,9 @@ remove_from_queue(QueueName, DeadPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }, - ok = rabbit_amqqueue:store_queue(Q1), + store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -134,22 +138,40 @@ add_mirror(Queue, MirrorNode) -> Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> case rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]) of - {ok, undefined} -> %% Already running - ok; - {ok, SPid} -> - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; - Other -> - Other - end; - [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} + [] -> + start_child(Name, MirrorNode, Q); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + {error,{queue_already_mirrored_on_node, + MirrorNode}}; + false -> + start_child(Name, MirrorNode, Q) + end end end). +start_child(Name, MirrorNode, Q) -> + case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + {ok, undefined} -> + %% this means the mirror process was + %% already running on the given node. + ok; + {ok, SPid} -> + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + ok; + {error, {{stale_master_pid, StalePid}, _}} -> + rabbit_log:warning("Detected stale HA master while adding " + "mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, StalePid]), + ok; + {error, {{duplicate_live_master, _}=Err, _}} -> + throw(Err); + Other -> + Other + end. + if_mirrored_queue(Queue, Fun) -> rabbit_amqqueue:with( Queue, fun (#amqqueue { arguments = Args } = Q) -> @@ -172,3 +194,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). + +store_updated_slaves(Q = #amqqueue{slave_pids = SPids, + sync_slave_pids = SSPids}) -> + SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + ok = rabbit_amqqueue:store_queue(Q1), + %% Wake it up so that we emit a stats event + rabbit_amqqueue:wake_up(Q1), + Q1. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 60d3e027fb..e4d78c45da 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -101,19 +101,10 @@ info(QPid) -> init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - case rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {new, QPid}; - [SPid] -> true = rabbit_misc:is_process_alive(SPid), - existing - end - end) of + case rabbit_misc:execute_mnesia_transaction(fun() -> + init_it(Self, Node, + QueueName) + end) of {new, MPid} -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -150,10 +141,38 @@ init(#amqqueue { name = QueueName } = Q) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; + {stale, StalePid} -> + {stop, {stale_master_pid, StalePid}}; + duplicate_live_master -> + {stop, {duplicate_live_master, Node}}; existing -> ignore end. +init_it(Self, Node, QueueName) -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> + MPids1 = MPids ++ [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), + {new, QPid}; + [QPid] -> + case rabbit_misc:is_process_alive(QPid) of + true -> duplicate_live_master; + false -> {stale, QPid} + end; + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> MPids1 = (MPids -- [SPid]) ++ [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), + {new, QPid} + end + end. + handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the @@ -447,8 +466,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> - rabbit_event:notify(queue_slave_promoted, [{pid, self()}, - {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, @@ -915,8 +932,17 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, %% unsynchronised: we assert that can never happen. set_synchronised(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> - rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, - {name, QName}]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2) + end + end), State #state { synchronised = true }; set_synchronised(true, State) -> State; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 51a0c4f06f..6e88de8201 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -29,8 +29,8 @@ -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). -export([confirm_to_sender/2]). --export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). --export([is_abnormal_termination/1]). +-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1, + filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([execute_mnesia_transaction/2]). @@ -63,6 +63,11 @@ -export([version/0]). -export([sequence_error/1]). +%% Horrible macro to use in guards +-define(IS_BENIGN_EXIT(R), + R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; + R =:= shutdown). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -139,8 +144,8 @@ -spec(throw_on_error/2 :: (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(is_abnormal_exit/1 :: (any()) -> boolean()). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(is_abnormal_termination/1 :: (any()) -> boolean()). -spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) @@ -429,13 +434,14 @@ with_exit_handler(Handler, Thunk) -> try Thunk() catch - exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> - Handler(); - exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> - Handler() + exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler(); + exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler() end. +is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false; +is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false; +is_abnormal_exit(_) -> true. + filter_exit_map(F, L) -> Ref = make_ref(), lists:filter(fun (R) -> R =/= Ref end, @@ -443,11 +449,6 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). -is_abnormal_termination(Reason) - when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> false; -is_abnormal_termination({shutdown, _}) -> false; -is_abnormal_termination(_) -> true. with_user(Username, Thunk) -> fun () -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7524600785..19dac70c79 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -313,7 +313,7 @@ handle_other(handshake_timeout, Deb, State) mainloop(Deb, State); handle_other(handshake_timeout, _Deb, State) -> throw({handshake_timeout, State#v1.callback}); -handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> +handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) -> mainloop(Deb, State); handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> throw({heartbeat_timeout, S}); @@ -355,9 +355,9 @@ switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> - {normal, send_exception(State, 0, - rabbit_misc:amqp_error( - connection_forced, Explanation, [], none))}; + {normal, handle_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. @@ -411,8 +411,6 @@ handle_dependent_exit(ChPid, Reason, State) -> {_Channel, controlled} -> maybe_close(control_throttle(State)); {Channel, uncontrolled} -> - log(error, "AMQP connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), maybe_close(handle_exception(control_throttle(State), Channel, Reason)) end. @@ -470,19 +468,46 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. -handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> +handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> + log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", + [self(), closed, Channel, Reason]), State; -handle_exception(State, Channel, Reason) -> - send_exception(State, Channel, Reason). - -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, - Channel, Reason) -> +handle_exception(State = #v1{connection = #connection{protocol = Protocol}, + connection_state = CS}, + Channel, Reason) + when ?IS_RUNNING(State) orelse CS =:= closing -> + log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", + [self(), CS, Channel, Reason]), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), terminate_channels(), State1 = close_connection(State), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), - State1. + State1; +handle_exception(State, Channel, Reason) -> + %% We don't trust the client at this point - force them to wait + %% for a bit so they can't DOS us with repeated failed logins etc. + timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({handshake_error, State#v1.connection_state, Channel, Reason}). + +frame_error(Error, Type, Channel, Payload, State) -> + {Str, Bin} = payload_snippet(Payload), + handle_exception(State, Channel, + rabbit_misc:amqp_error(frame_error, + "type ~p, ~s octets = ~p: ~p", + [Type, Str, Bin, Error], none)). + +unexpected_frame(Type, Channel, Payload, State) -> + {Str, Bin} = payload_snippet(Payload), + handle_exception(State, Channel, + rabbit_misc:amqp_error(unexpected_frame, + "type ~p, ~s octets = ~p", + [Type, Str, Bin], none)). + +payload_snippet(Payload) when size(Payload) =< 16 -> + {"all", Payload}; +payload_snippet(<<Snippet:16/binary, _/binary>>) -> + {"first 16", Snippet}. %%-------------------------------------------------------------------------- @@ -501,7 +526,7 @@ create_channel(Channel, State) -> MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), - ok. + {ChPid, AState}. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of @@ -532,34 +557,32 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of - error -> throw({unknown_frame, 0, Type, Payload}); + error -> frame_error(unknown_frame, Type, 0, Payload, State); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); - Other -> throw({unexpected_frame_on_channel0, Other}) + _Other -> unexpected_frame(Type, 0, Payload, State) end; handle_frame(Type, Channel, Payload, - State = #v1{connection = #connection{protocol = Protocol}}) -> + State = #v1{connection = #connection{protocol = Protocol}}) + when ?IS_RUNNING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of - error -> throw({unknown_frame, Channel, Type, Payload}); - heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State) - end. + error -> frame_error(unknown_frame, Type, Channel, Payload, State); + heartbeat -> unexpected_frame(Type, Channel, Payload, State); + Frame -> process_frame(Frame, Channel, State) + end; +handle_frame(Type, Channel, Payload, State) -> + unexpected_frame(Type, Channel, Payload, State). process_frame(Frame, Channel, State) -> - case get({channel, Channel}) of - {ChPid, AState} -> - case process_channel_frame(Frame, ChPid, AState) of - {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {error, Reason} -> handle_exception(State, Channel, Reason) - end; - undefined when ?IS_RUNNING(State) -> - ok = create_channel(Channel, State), - process_frame(Frame, Channel, State); - undefined -> - throw({channel_frame_while_starting, - Channel, State#v1.connection_state, Frame}) + {ChPid, AState} = case get({channel, Channel}) of + undefined -> create_channel(Channel, State); + Other -> Other + end, + case process_channel_frame(Frame, ChPid, AState) of + {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {error, Reason} -> handle_exception(State, Channel, Reason) end. process_channel_frame(Frame, ChPid, AState) -> @@ -594,14 +617,13 @@ handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1)); -handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, - State) -> - case PayloadAndMarker of - <<Payload:PayloadSize/binary, ?FRAME_END>> -> - switch_callback(handle_frame(Type, Channel, Payload, State), - frame_header, 7); - _ -> - throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) +handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> + <<Payload:PayloadSize/binary, EndMarker>> = Data, + case EndMarker of + ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), + switch_callback(State1, frame_header, 7); + _ -> frame_error({invalid_frame_end_marker, EndMarker}, + Type, Channel, Payload, State) end; %% The two rules pertaining to version negotiation: @@ -672,24 +694,14 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - HandleException = - fun(R) -> - case ?IS_RUNNING(State) of - true -> send_exception(State, 0, R); - %% We don't trust the client at this point - force - %% them to wait for a bit so they can't DOS us with - %% repeated failed logins etc. - false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, R}) - end - end, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:#amqp_error{method = none} = Reason -> - HandleException(Reason#amqp_error{method = MethodName}); + handle_exception(State, 0, Reason#amqp_error{method = MethodName}); Type:Reason -> - HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) + Stack = erlang:get_stacktrace(), + handle_exception(State, 0, {Type, Reason, MethodName, Stack}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, @@ -793,6 +805,10 @@ server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. +server_heartbeat() -> + {ok, Heartbeat} = application:get_env(rabbit, heartbeat), + Heartbeat. + send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -844,7 +860,7 @@ auth_phase(Response, {ok, User} -> Tune = #'connection.tune'{channel_max = 0, frame_max = server_frame_max(), - heartbeat = 0}, + heartbeat = server_heartbeat()}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User}} diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 18704807ba..47b22b98f6 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -40,6 +40,7 @@ -rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). +-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -62,6 +63,7 @@ -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). +-spec(sync_slave_pids/0 :: () -> 'ok'). -endif. @@ -240,6 +242,19 @@ queue_policy(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, mirror_nodes, policy]). +sync_slave_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddSyncSlavesFun = + fun ({amqqueue, N, D, AD, Excl, Args, Pid, SPids, MNodes, Pol}) -> + {amqqueue, N, D, AD, Excl, Args, Pid, SPids, [], MNodes, Pol} + end, + [ok = transform(T, AddSyncSlavesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, mirror_nodes, policy]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fb184d1ab2..df5f73e749 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -27,7 +27,7 @@ -behaviour(gen_server). --export([start_link/1]). +-export([start_link/1, start_link/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -51,7 +51,9 @@ memory_limit, timeout, timer, - alarmed + alarmed, + alarm_set, + alarm_clear }). %%---------------------------------------------------------------------------- @@ -59,6 +61,8 @@ -ifdef(use_specs). -spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/3 :: (float(), fun ((any()) -> 'ok'), + fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> non_neg_integer()). -spec(get_check_interval/0 :: () -> non_neg_integer()). @@ -99,14 +103,21 @@ get_memory_limit() -> %% gen_server callbacks %%---------------------------------------------------------------------------- -start_link(Args) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). +start_link(MemFraction) -> + start_link(MemFraction, + fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1). -init([MemFraction]) -> +start_link(MemFraction, AlarmSet, AlarmClear) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, + [MemFraction, AlarmSet, AlarmClear], []). + +init([MemFraction, AlarmSet, AlarmClear]) -> TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, timer = TRef, - alarmed = false}, + alarmed = false, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }, {ok, set_mem_limits(State, MemFraction)}. handle_call(get_vm_memory_high_watermark, _From, State) -> @@ -175,16 +186,18 @@ set_mem_limits(State, MemFraction) -> memory_limit = MemLim }). internal_update(State = #state { memory_limit = MemLimit, - alarmed = Alarmed}) -> + alarmed = Alarmed, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }) -> MemUsed = erlang:memory(total), NewAlarmed = MemUsed > MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({{resource_limit, memory, node()}, []}); + AlarmSet({{resource_limit, memory, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm({resource_limit, memory, node()}); + AlarmClear({resource_limit, memory, node()}); _ -> ok end, |
