summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-01 15:59:47 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-01 15:59:47 +0100
commitd39b2bb892ac9b4f92e27c9aa1ccb11276ec35fa (patch)
tree79688178fb289e88a08dc948964e9d7dc23453a6
parent03084f29639c7889b6c4d03c4290a9e66f81eb29 (diff)
parente62a381d53e320cb4778cfac94be18f38fb51c8f (diff)
downloadrabbitmq-server-git-d39b2bb892ac9b4f92e27c9aa1ccb11276ec35fa.tar.gz
merge default
-rw-r--r--Makefile16
-rwxr-xr-xcheck_xref291
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl3
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in6
-rw-r--r--src/file_handle_cache.erl27
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_alarm.erl81
-rw-r--r--src/rabbit_amqqueue.erl27
-rw-r--r--src/rabbit_amqqueue_process.erl51
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_control_main.erl13
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_disk_monitor.erl4
-rw-r--r--src/rabbit_exchange_type.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl71
-rw-r--r--src/rabbit_mirror_queue_slave.erl60
-rw-r--r--src/rabbit_misc.erl27
-rw-r--r--src/rabbit_reader.erl128
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/vm_memory_monitor.erl31
21 files changed, 670 insertions, 224 deletions
diff --git a/Makefile b/Makefile
index 0e3960dcfa..f3729cfa41 100644
--- a/Makefile
+++ b/Makefile
@@ -103,7 +103,7 @@ endif
all: $(TARGETS)
-.PHONY: plugins
+.PHONY: plugins check-xref
ifneq "$(PLUGINS_SRC_DIR)" ""
plugins:
[ -d "$(PLUGINS_SRC_DIR)/rabbitmq-server" ] || ln -s "$(CURDIR)" "$(PLUGINS_SRC_DIR)/rabbitmq-server"
@@ -111,9 +111,19 @@ plugins:
PLUGINS_SRC_DIR="" $(MAKE) -C "$(PLUGINS_SRC_DIR)" plugins-dist PLUGINS_DIST_DIR="$(CURDIR)/$(PLUGINS_DIR)" VERSION=$(VERSION)
echo "Put your EZs here and use rabbitmq-plugins to enable them." > $(PLUGINS_DIR)/README
rm -f $(PLUGINS_DIR)/rabbit_common*.ez
+
+# add -q to remove printout of warnings....
+check-xref: $(BEAM_TARGETS) $(PLUGINS_DIR)
+ rm -rf lib
+ ./check_xref $(PLUGINS_DIR) -q
+
else
plugins:
# Not building plugins
+
+check-xref:
+ $(info xref checks are disabled)
+
endif
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -217,11 +227,11 @@ stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
set-resource-alarm: all
- echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
+ echo "rabbit_alarm:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
$(ERL_CALL)
clear-resource-alarm: all
- echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \
+ echo "rabbit_alarm:clear_alarm({resource_limit, $(SOURCE), node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/check_xref b/check_xref
new file mode 100755
index 0000000000..8f65f3b12b
--- /dev/null
+++ b/check_xref
@@ -0,0 +1,291 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+-mode(compile).
+
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+main(["-h"]) ->
+ io:format("usage: check_xref PluginDirectory (options)~n"
+ "options:~n"
+ " -q - quiet mode (only prints errors)~n"
+ " -X - disables all filters~n");
+main([PluginsDir|Argv]) ->
+ put({?MODULE, quiet}, lists:member("-q", Argv)),
+ put({?MODULE, no_filters}, lists:member("-X", Argv)),
+
+ {ok, Cwd} = file:get_cwd(),
+ code:add_pathz(filename:join(Cwd, "ebin")),
+ LibDir = filename:join(Cwd, "lib"),
+ case filelib:is_dir(LibDir) of
+ false -> ok;
+ true -> os:cmd("rm -rf " ++ LibDir)
+ end,
+ Rc = try
+ check(Cwd, PluginsDir, LibDir, checks())
+ catch
+ _:Err ->
+ io:format(user, "failed: ~p~n", [Err]),
+ 1
+ end,
+ shutdown(Rc, LibDir).
+
+shutdown(Rc, LibDir) ->
+ os:cmd("rm -rf " ++ LibDir),
+ erlang:halt(Rc).
+
+check(Cwd, PluginsDir, LibDir, Checks) ->
+ {ok, Plugins} = file:list_dir(PluginsDir),
+ ok = file:make_dir(LibDir),
+ [begin
+ Source = filename:join(PluginsDir, Plugin),
+ Target = filename:join(LibDir, Plugin),
+ IsExternal = external_dependency(Plugin),
+ AppN = case IsExternal of
+ true -> filename:join(LibDir, unmangle_name(Plugin));
+ false -> filename:join(
+ LibDir, filename:basename(Plugin, ".ez"))
+ end,
+
+ report(info, "mkdir -p ~s~n", [Target]),
+ filelib:ensure_dir(Target),
+
+ report(info, "cp ~s ~s~n", [Source, Target]),
+ {ok, _} = file:copy(Source, Target),
+
+ report(info, "unzip -d ~s ~s~n", [LibDir, Target]),
+ {ok, _} = zip:unzip(Target, [{cwd, LibDir}]),
+
+ UnpackDir = filename:join(LibDir, filename:basename(Target, ".ez")),
+ report(info, "mv ~s ~s~n", [UnpackDir, AppN]),
+ ok = file:rename(UnpackDir, AppN),
+
+ code:add_patha(filename:join(AppN, "ebin")),
+ case IsExternal of
+ true -> App = list_to_atom(hd(string:tokens(filename:basename(AppN),
+ "-"))),
+ report(info, "loading ~p~n", [App]),
+ application:load(App),
+ store_third_party(App);
+ _ -> ok
+ end
+ end || Plugin <- Plugins,
+ lists:suffix(".ez", Plugin)],
+
+ RabbitAppEbin = filename:join([LibDir, "rabbit", "ebin"]),
+ filelib:ensure_dir(filename:join(RabbitAppEbin, "foo")),
+ {ok, Beams} = file:list_dir("ebin"),
+ [{ok, _} = file:copy(filename:join("ebin", Beam),
+ filename:join(RabbitAppEbin, Beam)) || Beam <- Beams],
+ xref:start(?MODULE),
+ xref:set_default(?MODULE, [{verbose, false}, {warnings, false}]),
+ xref:set_library_path(?MODULE, code:get_path()),
+ xref:add_release(?MODULE, Cwd, {name, rabbit}),
+ store_unresolved_calls(),
+ Results = lists:flatten([perform_analysis(Q) || Q <- Checks]),
+ report(Results).
+
+%%
+%% Analysis
+%%
+
+perform_analysis({Query, Description, Severity}) ->
+ perform_analysis({Query, Description, Severity, fun(_) -> false end});
+perform_analysis({Query, Description, Severity, Filter}) ->
+ report_progress("Checking whether any code ~s "
+ "(~s)~n", [Description, Query]),
+ case analyse(Query) of
+ {ok, Analysis} ->
+ [filter(Result, Filter) ||
+ Result <- process_analysis(Query, Description,
+ Severity, Analysis)];
+ {error, Module, Reason} ->
+ {analysis_error, {Module, Reason}}
+ end.
+
+partition(Results) ->
+ lists:partition(fun({{_, L}, _}) -> L =:= error end, Results).
+
+analyse(Query) when is_atom(Query) ->
+ xref:analyse(?MODULE, Query, [{verbose, false}]);
+analyse(Query) when is_list(Query) ->
+ xref:q(?MODULE, Query).
+
+process_analysis(Query, Tag, Severity, Analysis) when is_atom(Query) ->
+ [{{Tag, Severity}, MFA} || MFA <- Analysis];
+process_analysis(Query, Tag, Severity, Analysis) when is_list(Query) ->
+ [{{Tag, Severity}, Result} || Result <- Analysis].
+
+checks() ->
+ [{"(XXL)(Lin) ((XC - UC) || (XU - X - B))",
+ "has call to undefined function(s)",
+ error, filters()},
+ {"(Lin) (L - LU)", "has unused local function(s)",
+ error, filters()},
+ {"(Lin) (LU * (X - XU))",
+ "has exported function(s) only used locally",
+ warning, filters()},
+ {"(Lin) (DF * (XU + LU))", "used deprecated function(s)",
+ warning, filters()}].
+% {"(Lin) (X - XU)", "possibly unused export",
+% warning, fun filter_unused/1}].
+
+%%
+%% noise filters (can be disabled with -X) - strip uninteresting analyses
+%%
+
+filter(Result, Filter) ->
+ case Filter(Result) of
+ false -> Result;
+ true -> [] %% NB: this gets flattened out later on....
+ end.
+
+filters() ->
+ case get({?MODULE, no_filters}) of
+ true -> fun(_) -> false end;
+ _ -> filter_chain([fun is_unresolved_call/1, fun is_callback/1,
+ fun is_unused/1, fun is_irrelevant/1])
+ end.
+
+filter_chain(FnChain) ->
+ fun(AnalysisResult) ->
+ lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult));
+ (_F, true) -> true
+ end, false, FnChain)
+ end.
+
+cleanup({{_, _},{{{{_,_,_}=MFA1,_},{{_,_,_}=MFA2,_}},_}}) -> {MFA1, MFA2};
+cleanup({{_, _},{{{_,_,_}=MFA1,_},{{_,_,_}=MFA2,_}}}) -> {MFA1, MFA2};
+cleanup({{_, _},{{_,_,_}=MFA1,{_,_,_}=MFA2},_}) -> {MFA1, MFA2};
+cleanup({{_, _},{{_,_,_}=MFA1,{_,_,_}=MFA2}}) -> {MFA1, MFA2};
+cleanup({{_, _}, {_,_,_}=MFA}) -> MFA;
+cleanup({{_, _}, {{_,_,_}=MFA,_}}) -> MFA;
+cleanup({{_,_,_}=MFA, {_,_,_}}) -> MFA;
+cleanup({{_,_,_}=MFA, {_,_,_},_}) -> MFA;
+cleanup(Other) -> Other.
+
+is_irrelevant({{M,_,_}, {_,_,_}}) ->
+ is_irrelevant(M);
+is_irrelevant({M,_,_}) ->
+ is_irrelevant(M);
+is_irrelevant(Mod) when is_atom(Mod) ->
+ lists:member(Mod, get({?MODULE, third_party})).
+
+is_unused({{_,_,_}=MFA, {_,_,_}}) ->
+ is_unused(MFA);
+is_unused({M,_F,_A}) ->
+ lists:suffix("_tests", atom_to_list(M));
+is_unused(_) ->
+ false.
+
+is_unresolved_call({_, F, A}) ->
+ UC = get({?MODULE, unresolved_calls}),
+ sets:is_element({'$M_EXPR', F, A}, UC);
+is_unresolved_call(_) ->
+ false.
+
+%% TODO: cache this....
+is_callback({M,_,_}=MFA) ->
+ Attributes = M:module_info(attributes),
+ Behaviours = proplists:append_values(behaviour, Attributes),
+ {_, Callbacks} = lists:foldl(fun acc_behaviours/2, {M, []}, Behaviours),
+ lists:member(MFA, Callbacks);
+is_callback(_) ->
+ false.
+
+acc_behaviours(B, {M, CB}=Acc) ->
+ case catch(B:behaviour_info(callbacks)) of
+ [{_,_} | _] = Callbacks ->
+ {M, CB ++ [{M, F, A} || {F,A} <- Callbacks]};
+ _ ->
+ Acc
+ end.
+
+%%
+%% reporting/output
+%%
+
+report(Results) ->
+ [report_failures(F) || F <- Results],
+ {Errors, Warnings} = partition(Results),
+ report(info, "Completed: ~p errors, ~p warnings~n",
+ [length(Errors), length(Warnings)]),
+ case length(Errors) > 0 of
+ true -> 1;
+ false -> 0
+ end.
+
+report_failures({analysis_error, {Mod, Reason}}) ->
+ report(error, "~s:0 Analysis Error: ~p~n", [source_file(Mod), Reason]);
+report_failures({{Tag, Level}, {{{{M,_,_},L},{{M2,F2,A2},_}},_}}) ->
+ report(Level, "~s:~w ~s ~p:~p/~p~n",
+ [source_file(M), L, Tag, M2, F2, A2]);
+report_failures({{Tag, Level}, {{M,F,A},L}}) ->
+ report(Level, "~s:~w ~s ~p:~p/~p~n", [source_file(M), L, Tag, M, F, A]);
+report_failures({{Tag, Level}, {M,F,A}}) ->
+ report(Level, "~s:unknown ~s ~p:~p/~p~n", [source_file(M), Tag, M, F, A]);
+report_failures(Term) ->
+ report(error, "Ignoring ~p~n", [Term]),
+ ok.
+
+report_progress(Fmt, Args) ->
+ report(info, Fmt, Args).
+
+report(Level, Fmt, Args) ->
+ case {get({?MODULE, quiet}), Level} of
+ {true, error} -> do_report(lookup_prefix(Level), Fmt, Args);
+ {false, _} -> do_report(lookup_prefix(Level), Fmt, Args);
+ _ -> ok
+ end.
+
+do_report(Prefix, Fmt, Args) ->
+ io:format(Prefix ++ Fmt, Args).
+
+lookup_prefix(error) -> "ERROR: ";
+lookup_prefix(warning) -> "WARNING: ";
+lookup_prefix(info) -> "INFO: ".
+
+source_file(M) ->
+ proplists:get_value(source, M:module_info(compile)).
+
+%%
+%% setup/code-path/file-system ops
+%%
+
+store_third_party(App) ->
+ {ok, AppConfig} = application:get_all_key(App),
+ case get({?MODULE, third_party}) of
+ undefined ->
+ put({?MODULE, third_party},
+ proplists:get_value(modules, AppConfig));
+ Modules ->
+ put({?MODULE, third_party},
+ proplists:get_value(modules, AppConfig) ++ Modules)
+ end.
+
+%% TODO: this ought not to be maintained in such a fashion
+external_dependency(Path) ->
+ lists:any(fun(P) -> lists:prefix(P, Path) end,
+ ["mochiweb", "webmachine", "rfc4627", "eldap"]).
+
+unmangle_name(Path) ->
+ [Name, Vsn | _] = re:split(Path, "-", [{return, list}]),
+ string:join([Name, Vsn], "-").
+
+store_unresolved_calls() ->
+ {ok, UCFull} = analyse("UC"),
+ UC = [MFA || {_, {_,_,_} = MFA} <- UCFull],
+ put({?MODULE, unresolved_calls}, sets:from_list(UC)).
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 65308f8ee3..7884228169 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -25,6 +25,7 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
+ {heartbeat, 600},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
{default_user, <<"guest">>},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index e8b4a6232e..d6fac46db7 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,8 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes, policy}).
+ arguments, pid, slave_pids, sync_slave_pids, mirror_nodes,
+ policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 91510991cc..f5257040ef 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -101,7 +101,9 @@ Section "RabbitMQ Service" RabbitService
ExpandEnvStrings $0 %COMSPEC%
ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install'
ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start'
- CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie"
+ ReadEnvStr $1 "HOMEDRIVE"
+ ReadEnvStr $2 "HOMEPATH"
+ CopyFiles "$WINDIR\.erlang.cookie" "$1$2\.erlang.cookie"
SectionEnd
;--------------------------------
@@ -234,4 +236,4 @@ Function findErlang
System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0'
${EndIf}
-FunctionEnd \ No newline at end of file
+FunctionEnd
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 13ee42499a..68c095d2d5 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -150,8 +150,8 @@
info/0, info/1]).
-export([ulimit/0]).
--export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3, prioritise_cast/2]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -195,7 +195,9 @@
obtain_count,
obtain_pending,
clients,
- timer_ref
+ timer_ref,
+ alarm_set,
+ alarm_clear
}).
-record(cstate,
@@ -268,7 +270,11 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+ start_link(fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1).
+
+start_link(AlarmSet, AlarmClear) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear],
+ [{timeout, infinity}]).
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
@@ -806,7 +812,7 @@ i(Item, _) -> throw({bad_argument, Item}).
%% gen_server2 callbacks
%%----------------------------------------------------------------------------
-init([]) ->
+init([AlarmSet, AlarmClear]) ->
Limit = case application:get_env(file_handles_high_watermark) of
{ok, Watermark} when (is_integer(Watermark) andalso
Watermark > 0) ->
@@ -830,7 +836,9 @@ init([]) ->
obtain_count = 0,
obtain_pending = pending_new(),
clients = Clients,
- timer_ref = undefined }}.
+ timer_ref = undefined,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }}.
prioritise_cast(Msg, _State) ->
case Msg of
@@ -1026,10 +1034,11 @@ obtain_limit_reached(#fhc_state { obtain_limit = Limit,
obtain_count = Count}) ->
Limit =/= infinity andalso Count >= Limit.
-adjust_alarm(OldState, NewState) ->
+adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }, NewState) ->
case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
- {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
- {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ {false, true} -> AlarmSet({file_descriptor_limit, []});
+ {true, false} -> AlarmClear(file_descriptor_limit);
_ -> ok
end,
NewState.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 23b422af33..f7953c55d1 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -20,7 +20,8 @@
-export([start/0, boot/0, stop/0,
stop_and_halt/0, await_startup/0, status/0, is_running/0,
- is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]).
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/0,
+ start_fhc/0]).
-export([start/2, stop/1]).
@@ -53,8 +54,7 @@
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
- {mfa, {rabbit_sup, start_restartable_child,
- [file_handle_cache]}},
+ {mfa, {rabbit, start_fhc, []}},
{requires, pre_boot},
{enables, worker_pool}]}).
@@ -731,3 +731,10 @@ config_files() ->
[File] <- Files];
error -> []
end.
+
+%% We don't want this in fhc since it references rabbit stuff. And we can't put
+%% this in the bootstep directly.
+start_fhc() ->
+ rabbit_sup:start_restartable_child(
+ file_handle_cache,
+ [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d16d90a45d..e6625b2b90 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,22 +18,28 @@
-behaviour(gen_event).
--export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]).
+-export([start_link/0, start/0, stop/0, register/2, set_alarm/1,
+ clear_alarm/1, get_alarms/0, on_node_up/1, on_node_down/1]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
-export([remote_conserve_resources/3]). %% Internal use only
--record(alarms, {alertees, alarmed_nodes}).
+-define(SERVER, ?MODULE).
+
+-record(alarms, {alertees, alarmed_nodes, alarms}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(set_alarm/1 :: (any()) -> 'ok').
+-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
@@ -41,59 +47,70 @@
%%----------------------------------------------------------------------------
+start_link() ->
+ gen_event:start_link({local, ?SERVER}).
+
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ ok = rabbit_sup:start_restartable_child(?MODULE),
+ ok = gen_event:add_handler(?SERVER, ?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
- rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
-
+ rabbit_sup:start_restartable_child(
+ vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1,
+ fun rabbit_alarm:clear_alarm/1]),
{ok, DiskLimit} = application:get_env(disk_free_limit),
rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
-stop() ->
- ok = alarm_handler:delete_alarm_handler(?MODULE).
+stop() -> ok.
register(Pid, HighMemMFA) ->
- gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA},
infinity).
-on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}).
+set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
+clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
+
+get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity).
-on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
+on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
+on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
-%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
-%% permit notifying a remote node.
remote_conserve_resources(Pid, Source, true) ->
- gen_event:notify({alarm_handler, node(Pid)},
+ gen_event:notify({?SERVER, node(Pid)},
{set_alarm, {{resource_limit, Source, node()}, []}});
remote_conserve_resources(Pid, Source, false) ->
- gen_event:notify({alarm_handler, node(Pid)},
+ gen_event:notify({?SERVER, node(Pid)},
{clear_alarm, {resource_limit, Source, node()}}).
+
%%----------------------------------------------------------------------------
init([]) ->
{ok, #alarms{alertees = dict:new(),
- alarmed_nodes = dict:new()}}.
+ alarmed_nodes = dict:new(),
+ alarms = []}}.
handle_call({register, Pid, HighMemMFA}, State) ->
{ok, 0 < dict:size(State#alarms.alarmed_nodes),
internal_register(Pid, HighMemMFA, State)};
+handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
+ {ok, Alarms, State};
+
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
- {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
+ handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]});
-handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
+ handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1,
+ Alarms)});
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
ok = gen_event:notify(
- {alarm_handler, Node},
+ {?SERVER, Node},
{register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
@@ -186,3 +203,25 @@ internal_register(Pid, {M, F, A} = HighMemMFA,
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
State#alarms{alertees = NewAlertees}.
+
+handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
+ rabbit_log:warning("~s resource limit alarm set on node ~p~n",
+ [Source, Node]),
+ {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+handle_set_alarm({file_descriptor_limit, []}, State) ->
+ rabbit_log:warning("file descriptor limit alarm set~n"),
+ {ok, State};
+handle_set_alarm(Alarm, State) ->
+ rabbit_log:warning("alarm '~p' set~n", [Alarm]),
+ {ok, State}.
+
+handle_clear_alarm({resource_limit, Source, Node}, State) ->
+ rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
+ [Source, Node]),
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+handle_clear_alarm(file_descriptor_limit, State) ->
+ rabbit_log:warning("file descriptor limit alarm cleared~n"),
+ {ok, State};
+handle_clear_alarm(Alarm, State) ->
+ rabbit_log:warning("alarm '~p' cleared~n", [Alarm]),
+ {ok, State}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index afbaea651b..a5f227bc19 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0]).
+-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
@@ -102,6 +102,7 @@
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
+-spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -215,6 +216,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
+ sync_slave_pids = [],
mirror_nodes = MNodes}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -475,6 +477,8 @@ force_event_refresh(QNames) ->
force_event_refresh(Failed)
end.
+wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
+
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
@@ -599,7 +603,7 @@ on_node_down(Node) ->
slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node andalso
- not is_process_alive(Pid)])),
+ not rabbit_misc:is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
@@ -672,13 +676,18 @@ qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
safe_delegate_call_ok(F, Pids) ->
- case delegate:invoke(Pids, fun (Pid) ->
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> F(Pid) end)
- end) of
- {_, []} -> ok;
- {_, Bad} -> {error, Bad}
+ {_, Bads} = delegate:invoke(Pids, fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () -> F(Pid) end)
+ end),
+ case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) ->
+ rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) ->
+ false
+ end, Bads) of
+ [] -> ok;
+ Bads1 -> {error, Bads1}
end.
delegate_call(Pid, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8933de8746..b4071627ce 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -93,6 +93,7 @@
consumers,
memory,
slave_pids,
+ synchronised_slave_pids,
backing_queue_status
]).
@@ -102,9 +103,7 @@
durable,
auto_delete,
arguments,
- owner_pid,
- slave_pids,
- synchronised_slave_pids
+ owner_pid
]).
-define(INFO_KEYS,
@@ -788,7 +787,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
case pmon:is_monitored(QPid, QMons) of
false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> case rabbit_misc:is_abnormal_exit(Reason) of
true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
QNameS = rabbit_misc:rs(qname(State)),
rabbit_log:warning("DLQ ~p for ~s died with "
@@ -893,37 +892,7 @@ make_dead_letter_msg(Reason,
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) ->
- {Prefix, Items1} =
- case lists:member(synchronised_slave_pids, Items) of
- true -> Prefix1 = slaves_status(State),
- case lists:member(slave_pids, Items) of
- true -> {Prefix1, Items -- [slave_pids]};
- false -> {proplists:delete(slave_pids, Prefix1), Items}
- end;
- false -> {[], Items}
- end,
- Prefix ++ [{Item, i(Item, State)}
- || Item <- (Items1 -- [synchronised_slave_pids])].
-
-slaves_status(#q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} ->
- [{slave_pids, ''}, {synchronised_slave_pids, ''}];
- {ok, #amqqueue{slave_pids = SPids}} ->
- {Results, _Bad} =
- delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
- {SPids1, SSPids} =
- lists:foldl(
- fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
- {[Pid | SPidsN],
- case proplists:get_bool(is_synchronised, Infos) of
- true -> [Pid | SSPidsN];
- false -> SSPidsN
- end}
- end, {[], []}, Results),
- [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
- end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -957,9 +926,14 @@ i(memory, _) ->
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{mirror_nodes = undefined}} -> '';
{ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
+i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> '';
+ {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids
+ end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1307,7 +1281,10 @@ handle_cast({set_maximum_since_use, Age}, State) ->
noreply(State);
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- dead_letter_msg(Msg, AckTag, Reason, State).
+ dead_letter_msg(Msg, AckTag, Reason, State);
+
+handle_cast(wake_up, State) ->
+ noreply(State).
%% We need to not ignore this as we need to remove outstanding
%% confirms due to queue death.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 864e100afc..69fe0edca3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -405,17 +405,17 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
writer_pid = WriterPid,
reader_pid = ReaderPid,
conn_pid = ConnPid}) ->
- {CloseChannel, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ConnPid, Channel, Reason]),
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
- case CloseChannel of
- Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
- {noreply, State1};
- _ -> ReaderPid ! {channel_exit, Channel, Reason},
- {stop, normal, State1}
+ case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
+ {Channel, CloseMethod} ->
+ rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n",
+ [ConnPid, Channel, Reason]),
+ ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ {0, _} ->
+ ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
end.
precondition_failed(Format) -> precondition_failed(Format, []).
@@ -1116,7 +1116,7 @@ monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
delivering_queues = sets:add_element(QPid, DQ)}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
- case rabbit_misc:is_abnormal_termination(Reason) of
+ case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
send_nacks(MXs, State#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index ebfc563691..0b1902509d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -196,11 +196,11 @@ print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) ->
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg).
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
- case Results = rpc_call(Node, Module, InfoFun, VHostArg) of
- [_|_] -> InfoItems = rpc_call(Node, Module, KeysFun, []),
- display_row([atom_to_list(I) || I <- InfoItems]),
- display_info_list(Results, InfoItems);
- _ -> ok
+ case rpc_call(Node, Module, InfoFun, VHostArg) of
+ [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []),
+ display_row([atom_to_list(I) || I <- InfoItems]),
+ display_info_list(Results, InfoItems);
+ _ -> ok
end,
io:nl().
@@ -452,14 +452,13 @@ action(list_parameters, Node, Args = [], _Opts, Inform) ->
rabbit_runtime_parameters:info_keys());
action(report, Node, _Args, _Opts, Inform) ->
- io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]),
+ Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
N <- unsafe_rpc(Node, rabbit_mnesia, running_clustered_nodes, []),
Action <- [status, cluster_status, environment]],
VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
- io:format("End of server status report~n"),
ok;
action(eval, Node, [Expr], _Opts, _Inform) ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index c07ad832f0..c87b1dc1ec 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -40,7 +40,6 @@
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}).
-
-spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok').
-endif.
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 58375abb45..e72181c061 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -149,10 +149,10 @@ internal_update(State = #state { limit = Limit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
- alarm_handler:set_alarm({{resource_limit, disk, node()}, []});
+ rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
- alarm_handler:clear_alarm({resource_limit, disk, node()});
+ rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
end,
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index e6470b721e..9a793aab09 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -54,13 +54,13 @@
%% called when comparing exchanges for equivalence - should return ok or
%% exit with #amqp_error{}
--callback assert_args_equivalence (rabbit_types:exchange(),
- rabbit_framing:amqp_table()) ->
+-callback assert_args_equivalence(rabbit_types:exchange(),
+ rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
%% called when the policy attached to this exchange changes.
--callback policy_changed (
- serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+-callback policy_changed(serial(), rabbit_types:exchange(),
+ rabbit_types:exchange()) -> 'ok'.
-else.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe55..29e2d29f5c 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -18,7 +18,7 @@
-export([remove_from_queue/2, on_node_up/0,
drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
- report_deaths/4]).
+ report_deaths/4, store_updated_slaves/1]).
-include("rabbit.hrl").
@@ -37,6 +37,8 @@
-spec(add_mirror/3 ::
(rabbit_types:vhost(), binary(), atom())
-> rabbit_types:ok_or_error(any())).
+-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
+ rabbit_types:amqqueue()).
-endif.
@@ -58,11 +60,13 @@ remove_from_queue(QueueName, DeadPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ not lists:member(node(Pid),
+ DeadNodes) orelse
+ rabbit_misc:is_process_alive(Pid)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
@@ -70,9 +74,9 @@ remove_from_queue(QueueName, DeadPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 },
- ok = rabbit_amqqueue:store_queue(Q1),
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
@@ -134,22 +138,40 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,
+ MirrorNode}};
+ false ->
+ start_child(Name, MirrorNode, Q)
+ end
end
end).
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} ->
+ %% this means the mirror process was
+ %% already running on the given node.
+ ok;
+ {ok, SPid} ->
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ {error, {{stale_master_pid, StalePid}, _}} ->
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, StalePid]),
+ ok;
+ {error, {{duplicate_live_master, _}=Err, _}} ->
+ throw(Err);
+ Other ->
+ Other
+ end.
+
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
Queue, fun (#amqqueue { arguments = Args } = Q) ->
@@ -172,3 +194,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
end,
rabbit_misc:pid_to_string(MirrorPid),
[[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
+
+store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ ok = rabbit_amqqueue:store_queue(Q1),
+ %% Wake it up so that we emit a stats event
+ rabbit_amqqueue:wake_up(Q1),
+ Q1.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 60d3e027fb..e4d78c45da 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -101,19 +101,10 @@ info(QPid) ->
init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
- end
- end) of
+ case rabbit_misc:execute_mnesia_transaction(fun() ->
+ init_it(Self, Node,
+ QueueName)
+ end) of
{new, MPid} ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -150,10 +141,38 @@ init(#amqqueue { name = QueueName } = Q) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, StalePid} ->
+ {stop, {stale_master_pid, StalePid}};
+ duplicate_live_master ->
+ {stop, {duplicate_live_master, Node}};
existing ->
ignore
end.
+init_it(Self, Node, QueueName) ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
+ end.
+
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the
@@ -447,8 +466,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
- rabbit_event:notify(queue_slave_promoted, [{pid, self()},
- {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
[rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
@@ -915,8 +932,17 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
%% unsynchronised: we assert that can never happen.
set_synchronised(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
- rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
- {name, QName}]),
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2)
+ end
+ end),
State #state { synchronised = true };
set_synchronised(true, State) ->
State;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 51a0c4f06f..6e88de8201 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -29,8 +29,8 @@
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
-export([confirm_to_sender/2]).
--export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
--export([is_abnormal_termination/1]).
+-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
+ filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
@@ -63,6 +63,11 @@
-export([version/0]).
-export([sequence_error/1]).
+%% Horrible macro to use in guards
+-define(IS_BENIGN_EXIT(R),
+ R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
+ R =:= shutdown).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -139,8 +144,8 @@
-spec(throw_on_error/2 ::
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(is_abnormal_exit/1 :: (any()) -> boolean()).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(is_abnormal_termination/1 :: (any()) -> boolean()).
-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
@@ -429,13 +434,14 @@ with_exit_handler(Handler, Thunk) ->
try
Thunk()
catch
- exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
- Handler();
- exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
- Handler()
+ exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler();
+ exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler()
end.
+is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false;
+is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false;
+is_abnormal_exit(_) -> true.
+
filter_exit_map(F, L) ->
Ref = make_ref(),
lists:filter(fun (R) -> R =/= Ref end,
@@ -443,11 +449,6 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
-is_abnormal_termination(Reason)
- when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown -> false;
-is_abnormal_termination({shutdown, _}) -> false;
-is_abnormal_termination(_) -> true.
with_user(Username, Thunk) ->
fun () ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7524600785..19dac70c79 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -313,7 +313,7 @@ handle_other(handshake_timeout, Deb, State)
mainloop(Deb, State);
handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
-handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
+handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
throw({heartbeat_timeout, S});
@@ -355,9 +355,9 @@ switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
- {normal, send_exception(State, 0,
- rabbit_misc:amqp_error(
- connection_forced, Explanation, [], none))};
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
@@ -411,8 +411,6 @@ handle_dependent_exit(ChPid, Reason, State) ->
{_Channel, controlled} ->
maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
maybe_close(handle_exception(control_throttle(State),
Channel, Reason))
end.
@@ -470,19 +468,46 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), closed, Channel, Reason]),
State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), CS, Channel, Reason]),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
terminate_channels(),
State1 = close_connection(State),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
- State1.
+ State1;
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
%%--------------------------------------------------------------------------
@@ -501,7 +526,7 @@ create_channel(Channel, State) ->
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
- ok.
+ {ChPid, AState}.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -532,34 +557,32 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, 0, Type, Payload});
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
- Other -> throw({unexpected_frame_on_channel0, Other})
+ _Other -> unexpected_frame(Type, 0, Payload, State)
end;
handle_frame(Type, Channel, Payload,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, Channel, Type, Payload});
- heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
- end.
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end;
- undefined when ?IS_RUNNING(State) ->
- ok = create_channel(Channel, State),
- process_frame(Frame, Channel, State);
- undefined ->
- throw({channel_frame_while_starting,
- Channel, State#v1.connection_state, Frame})
+ {ChPid, AState} = case get({channel, Channel}) of
+ undefined -> create_channel(Channel, State);
+ Other -> Other
+ end,
+ case process_channel_frame(Frame, ChPid, AState) of
+ {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {error, Reason} -> handle_exception(State, Channel, Reason)
end.
process_channel_frame(Frame, ChPid, AState) ->
@@ -594,14 +617,13 @@ handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker,
- State) ->
- case PayloadAndMarker of
- <<Payload:PayloadSize/binary, ?FRAME_END>> ->
- switch_callback(handle_frame(Type, Channel, Payload, State),
- frame_header, 7);
- _ ->
- throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
+handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
+ <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ case EndMarker of
+ ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
+ switch_callback(State1, frame_header, 7);
+ _ -> frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -672,24 +694,14 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- HandleException =
- fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
- end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:#amqp_error{method = none} = Reason ->
- HandleException(Reason#amqp_error{method = MethodName});
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
- HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
+ Stack = erlang:get_stacktrace(),
+ handle_exception(State, 0, {Type, Reason, MethodName, Stack})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
@@ -793,6 +805,10 @@ server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
+server_heartbeat() ->
+ {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
+ Heartbeat.
+
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -844,7 +860,7 @@ auth_phase(Response,
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
- heartbeat = 0},
+ heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User}}
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 18704807ba..47b22b98f6 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -40,6 +40,7 @@
-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
-rabbit_upgrade({policy, mnesia,
[exchange_scratches, ha_mirrors]}).
+-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -62,6 +63,7 @@
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
+-spec(sync_slave_pids/0 :: () -> 'ok').
-endif.
@@ -240,6 +242,19 @@ queue_policy(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid,
slave_pids, mirror_nodes, policy]).
+sync_slave_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddSyncSlavesFun =
+ fun ({amqqueue, N, D, AD, Excl, Args, Pid, SPids, MNodes, Pol}) ->
+ {amqqueue, N, D, AD, Excl, Args, Pid, SPids, [], MNodes, Pol}
+ end,
+ [ok = transform(T, AddSyncSlavesFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, mirror_nodes, policy])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fb184d1ab2..df5f73e749 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -27,7 +27,7 @@
-behaviour(gen_server).
--export([start_link/1]).
+-export([start_link/1, start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -51,7 +51,9 @@
memory_limit,
timeout,
timer,
- alarmed
+ alarmed,
+ alarm_set,
+ alarm_clear
}).
%%----------------------------------------------------------------------------
@@ -59,6 +61,8 @@
-ifdef(use_specs).
-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/3 :: (float(), fun ((any()) -> 'ok'),
+ fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()).
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
@@ -99,14 +103,21 @@ get_memory_limit() ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+start_link(MemFraction) ->
+ start_link(MemFraction,
+ fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1).
-init([MemFraction]) ->
+start_link(MemFraction, AlarmSet, AlarmClear) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE,
+ [MemFraction, AlarmSet, AlarmClear], []).
+
+init([MemFraction, AlarmSet, AlarmClear]) ->
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef,
- alarmed = false},
+ alarmed = false,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear },
{ok, set_mem_limits(State, MemFraction)}.
handle_call(get_vm_memory_high_watermark, _From, State) ->
@@ -175,16 +186,18 @@ set_mem_limits(State, MemFraction) ->
memory_limit = MemLim }).
internal_update(State = #state { memory_limit = MemLimit,
- alarmed = Alarmed}) ->
+ alarmed = Alarmed,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }) ->
MemUsed = erlang:memory(total),
NewAlarmed = MemUsed > MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({{resource_limit, memory, node()}, []});
+ AlarmSet({{resource_limit, memory, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm({resource_limit, memory, node()});
+ AlarmClear({resource_limit, memory, node()});
_ ->
ok
end,