summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-27 16:19:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-27 16:19:50 +0100
commitfabe615f03ce12cdfa7ddf1a8ed0a5eb1c23ce32 (patch)
tree8581ea0b1b4cc39c94c75114779e2b07cec4840d /src
parent32d6e55012f7db98b7a40ee06e943e063ddd49c7 (diff)
parent96682191f028615959994aca91b2d0dd73591b95 (diff)
downloadrabbitmq-server-git-fabe615f03ce12cdfa7ddf1a8ed0a5eb1c23ce32.tar.gz
Merge in stable
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_autoheal.erl1
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_disk_monitor.erl20
-rw-r--r--src/rabbit_error_logger.erl9
-rw-r--r--src/rabbit_error_logger_file_h.erl23
-rw-r--r--src/rabbit_networking.erl31
-rw-r--r--src/rabbit_node_monitor.erl27
-rw-r--r--src/rabbit_prelaunch.erl4
-rw-r--r--src/rabbit_sasl_report_file_h.erl5
-rw-r--r--src/rabbit_ssl.erl14
-rw-r--r--src/rabbit_vm.erl2
-rw-r--r--src/truncate.erl20
13 files changed, 132 insertions, 62 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c2d7e29d6b..29e38c1f5f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -794,9 +794,25 @@ config_files() ->
end,
case init:get_argument(config) of
{ok, Files} -> [Abs(File) || [File] <- Files];
- error -> case os:getenv("RABBITMQ_CONFIG_FILE") of
- false -> [];
- File -> [Abs(File) ++ " (not found)"]
+ error -> case config_setting() of
+ none -> [];
+ File -> [Abs(File) ++ " (not found)"]
+ end
+ end.
+
+%% This is a pain. We want to know where the config file is. But we
+%% can't specify it on the command line if it is missing or the VM
+%% will fail to start, so we need to find it by some mechanism other
+%% than init:get_arguments/0. We can look at the environment variable
+%% which is responsible for setting it... but that doesn't work for a
+%% Windows service since the variable can change and the service not
+%% be reinstalled, so in that case we add a magic application env.
+config_setting() ->
+ case application:get_env(rabbit, windows_service_config) of
+ {ok, File1} -> File1;
+ undefined -> case os:getenv("RABBITMQ_CONFIG_FILE") of
+ false -> none;
+ File2 -> File2
end
end.
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 826bfc45d4..c5237d346d 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -118,6 +118,7 @@ node_down(Node, _State) ->
handle_msg({request_start, Node},
not_healing, Partitions) ->
rabbit_log:info("Autoheal request received from ~p~n", [Node]),
+ rabbit_node_monitor:ping_all(),
case rabbit_node_monitor:all_rabbit_nodes_up() of
false -> not_healing;
true -> AllPartitions = all_partitions(Partitions),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eb9ed4ed07..74f9cacf76 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -187,7 +187,7 @@ force_event_refresh(Ref) ->
%%---------------------------------------------------------------------------
-init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
+init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
?store_proc_name({ConnName, Channel}),
@@ -195,7 +195,7 @@ init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
- reader_pid = Foo,
+ reader_pid = ReaderPid,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
@@ -894,7 +894,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- test(State, ExchangeName),
+ check_not_default_exchange(ExchangeName),
+ check_configure_permitted(ExchangeName, State),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -1118,7 +1119,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
- State1 = test2(State, Msgs),
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1);
({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
@@ -1164,13 +1165,6 @@ handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
-test2(State, Msgs) ->
- rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs).
-
-test(State, ExchangeName) ->
- check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, State).
-
%%----------------------------------------------------------------------------
%% We get the queue process to send the consume_ok on our behalf. This
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index fbf13a90c8..031a04f0a1 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -107,8 +107,8 @@ init([Limit]) ->
{stop, unsupported_platform}
end.
-handle_call(get_disk_free_limit, _From, State) ->
- {reply, interpret_limit(State#state.limit), State};
+handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
+ {reply, Limit, State};
handle_call({set_disk_free_limit, Limit}, _From, State) ->
{reply, ok, set_disk_limits(State, Limit)};
@@ -153,29 +153,29 @@ code_change(_OldVsn, State, _Extra) ->
% the partition / drive containing this directory will be monitored
dir() -> rabbit_mnesia:dir().
-set_disk_limits(State, Limit) ->
+set_disk_limits(State, Limit0) ->
+ Limit = interpret_limit(Limit0),
State1 = State#state { limit = Limit },
rabbit_log:info("Disk free limit set to ~pMB~n",
- [trunc(interpret_limit(Limit) / 1000000)]),
+ [trunc(Limit / 1000000)]),
internal_update(State1).
internal_update(State = #state { limit = Limit,
dir = Dir,
alarmed = Alarmed}) ->
- CurrentFreeBytes = get_disk_free(Dir),
- LimitBytes = interpret_limit(Limit),
- NewAlarmed = CurrentFreeBytes < LimitBytes,
+ CurrentFree = get_disk_free(Dir),
+ NewAlarmed = CurrentFree < Limit,
case {Alarmed, NewAlarmed} of
{false, true} ->
- emit_update_info("insufficient", CurrentFreeBytes, LimitBytes),
+ emit_update_info("insufficient", CurrentFree, Limit),
rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
- emit_update_info("sufficient", CurrentFreeBytes, LimitBytes),
+ emit_update_info("sufficient", CurrentFree, Limit),
rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
end,
- State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}.
+ State #state {alarmed = NewAlarmed, actual = CurrentFree}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 993f56f92f..353da0a7e2 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -27,6 +27,8 @@
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
handle_info/2]).
+-import(rabbit_error_logger_file_h, [safe_handle_event/3]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -65,10 +67,13 @@ code_change(_OldVsn, State, _Extra) ->
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({Kind, _Gleader, {_Pid, Format, Data}}, State) ->
+handle_event(Event, State) ->
+ safe_handle_event(fun handle_event0/2, Event, State).
+
+handle_event0({Kind, _Gleader, {_Pid, Format, Data}}, State) ->
ok = publish(Kind, Format, Data, State),
{ok, State};
-handle_event(_Event, State) ->
+handle_event0(_Event, State) ->
{ok, State}.
handle_info(_Info, State) ->
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 16ab6d3a0e..be84273904 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -22,6 +22,8 @@
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
code_change/3]).
+-export([safe_handle_event/3]).
+
%% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h
%% module because the original's init/1 does not match properly
%% with the result of closing the old handler when swapping handlers.
@@ -77,8 +79,21 @@ init_file(File, PrevHandler) ->
Error -> Error
end.
+handle_event(Event, State) ->
+ safe_handle_event(fun handle_event0/2, Event, State).
+
+safe_handle_event(HandleEvent, Event, State) ->
+ try
+ HandleEvent(Event, State)
+ catch
+ _:Error ->
+ io:format("Event crashed log handler:~n~P~n~P~n",
+ [Event, 30, Error, 30]),
+ {ok, State}
+ end.
+
%% filter out "application: foo; exited: stopped; type: temporary"
-handle_event({info_report, _, {_, std_info, _}}, State) ->
+handle_event0({info_report, _, {_, std_info, _}}, State) ->
{ok, State};
%% When a node restarts quickly it is possible the rest of the cluster
%% will not have had the chance to remove its queues from
@@ -88,7 +103,7 @@ handle_event({info_report, _, {_, std_info, _}}, State) ->
%% logs an event for every one of those messages; in extremis this can
%% bring the server to its knees just logging "Discarding..."
%% again and again. So just log the first one, then go silent.
-handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
+handle_event0(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
State) ->
case get(discarding_message_seen) of
true -> {ok, State};
@@ -96,10 +111,10 @@ handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
error_logger_file_h:handle_event(t(Event), State)
end;
%% Clear this state if we log anything else (but not a progress report).
-handle_event(Event = {info_msg, _, _}, State) ->
+handle_event0(Event = {info_msg, _, _}, State) ->
erase(discarding_message_seen),
error_logger_file_h:handle_event(t(Event), State);
-handle_event(Event, State) ->
+handle_event0(Event, State) ->
error_logger_file_h:handle_event(t(Event), State).
handle_info(Info, State) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9082dbd353..0791bbe23d 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -16,8 +16,8 @@
-module(rabbit_networking).
--export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
- stop_tcp_listener/1, on_node_down/1, active_listeners/0,
+-export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2,
+ on_node_down/1, active_listeners/0,
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
@@ -60,10 +60,10 @@
-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
+-spec(killall/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(start_ssl_listener/2 ::
(listener_config(), rabbit_types:infos()) -> 'ok').
--spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(register_connection/1 :: (pid()) -> ok).
@@ -145,6 +145,25 @@ start() -> rabbit_sup:start_supervisor_child(
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]).
+%% We are going to stop for pause-minority, so we are already
+%% compromised; anything we confirm from now on is not going to be
+%% remembered after we come back. Since rabbit:stop/0 may take a while
+%% to gracefully shut down, we should stop talking to the outside
+%% world *immediately*.
+killall() ->
+ %% Stop ASAP
+ kill_connections(),
+ {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners),
+ {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners),
+ [stop_listener(L) || L <- TCPListeners ++ SSLListeners],
+ %% In case anything reconnected while we were stopping listeners
+ kill_connections(),
+ ok.
+
+kill_connections() ->
+ Conns = connections_local() ++ rabbit_direct:list_local(),
+ [exit(P, kill) || P <- Conns].
+
ensure_ssl() ->
{ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
ok = app_utils:start_applications(SslAppsConfig),
@@ -245,12 +264,12 @@ start_listener0(Address, Protocol, Label, OnConnect) ->
{rabbit_misc:ntoa(IPAddress), Port}})
end.
-stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Address) ||
+stop_listener(Listener) ->
+ [stop_listener0(Address) ||
Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family}) ->
+stop_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 1496147848..22b0c28087 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -31,7 +31,7 @@
code_change/3]).
%% Utils
--export([all_rabbit_nodes_up/0, run_outside_applications/1]).
+-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -63,6 +63,7 @@
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
+-spec(ping_all/0 :: () -> 'ok').
-endif.
@@ -301,12 +302,11 @@ handle_info(ping_nodes, State) ->
%% to ping the nodes that are up, after all.
State1 = State#state{down_ping_timer = undefined},
Self = self(),
- %% all_nodes_up() both pings all the nodes and tells us if we need to again.
- %%
%% We ping in a separate process since in a partition it might
%% take some noticeable length of time and we don't want to block
%% the node monitor for that long.
spawn_link(fun () ->
+ ping_all(),
case all_nodes_up() of
true -> ok;
false -> Self ! ping_again
@@ -361,10 +361,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
- Nodes = rabbit_mnesia:cluster_nodes(all),
run_outside_applications(fun () ->
+ rabbit_networking:killall(),
rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ wait_for_cluster_recovery()
end),
ok.
@@ -381,11 +381,12 @@ run_outside_applications(Fun) ->
end
end).
-wait_for_cluster_recovery(Nodes) ->
+wait_for_cluster_recovery() ->
+ ping_all(),
case majority() of
true -> rabbit:start();
false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
- wait_for_cluster_recovery(Nodes)
+ wait_for_cluster_recovery()
end.
handle_dead_rabbit(Node, State = #state{partitions = Partitions,
@@ -453,6 +454,11 @@ del_node(Node, Nodes) -> Nodes -- [Node].
%% functions here. "rabbit" in a function's name implies we test if
%% the rabbit application is up, not just the node.
+%% As we use these functions to decide what to do in pause_minority
+%% state, they *must* be fast, even in the case where TCP connections
+%% are timing out. So that means we should be careful about whether we
+%% connect to nodes which are currently disconnected.
+
majority() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
@@ -465,9 +471,14 @@ all_rabbit_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
-alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
+alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_rabbit_nodes(Nodes) ->
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
+
+%% This one is allowed to connect!
+ping_all() ->
+ [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
+ ok.
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 4cc9cd12f1..6a6a4ee680 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -90,9 +90,9 @@ dist_port_set_check() ->
{none, none} -> ok;
_ -> rabbit_misc:quit(?DIST_PORT_CONFIGURED)
end;
+ {ok, _} ->
+ ok;
{error, _} ->
- %% TODO can we present errors more nicely here
- %% than after -config has failed?
ok
end
end.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 4881210dc5..2dd16702b6 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -22,6 +22,8 @@
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
code_change/3]).
+-import(rabbit_error_logger_file_h, [safe_handle_event/3]).
+
%% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h
%% module because the original's init/1 does not match properly
%% with the result of closing the old handler when swapping handlers.
@@ -67,6 +69,9 @@ init_file({File, Type}) ->
end.
handle_event(Event, State) ->
+ safe_handle_event(fun handle_event0/2, Event, State).
+
+handle_event0(Event, State) ->
sasl_report_file_h:handle_event(
truncate:log_event(Event, ?LOG_TRUNC), State).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e289489697..bd5dcf070b 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -104,15 +104,11 @@ peer_cert_auth_name(common_name, Cert) ->
auth_config_sane() ->
{ok, Opts} = application:get_env(rabbit, ssl_options),
- case {proplists:get_value(fail_if_no_peer_cert, Opts),
- proplists:get_value(verify, Opts)} of
- {true, verify_peer} ->
- true;
- {F, V} ->
- rabbit_log:warning("SSL certificate authentication disabled, "
- "fail_if_no_peer_cert=~p; "
- "verify=~p~n", [F, V]),
- false
+ case proplists:get_value(verify, Opts) of
+ verify_peer -> true;
+ V -> rabbit_log:warning("SSL certificate authentication "
+ "disabled, verify=~p~n", [V]),
+ false
end.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 48bbf64a95..6fe65c12a2 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -86,7 +86,7 @@ mnesia_memory() ->
case mnesia:system_info(is_running) of
yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
Tab <- mnesia:system_info(tables)]);
- no -> 0
+ _ -> 0
end.
ets_memory(Name) ->
diff --git a/src/truncate.erl b/src/truncate.erl
index 1c8332590a..820af1bf86 100644
--- a/src/truncate.erl
+++ b/src/truncate.erl
@@ -102,17 +102,17 @@ term_limit(Thing, Max) ->
term_size(B, M, _W) when is_bitstring(B) -> lim(M, size(B));
term_size(A, M, W) when is_atom(A) -> lim(M, 2 * W);
term_size(N, M, W) when is_number(N) -> lim(M, 2 * W);
-term_size(F, M, W) when is_function(F) -> lim(M, erts_debug:flat_size(F) * W);
-term_size(P, M, W) when is_pid(P) -> lim(M, erts_debug:flat_size(P) * W);
term_size(T, M, W) when is_tuple(T) -> tuple_term_size(
T, M, 1, tuple_size(T), W);
-term_size([], M, _W) ->
+term_size([], M, _W) ->
M;
term_size([H|T], M, W) ->
case term_size(H, M, W) of
limit_exceeded -> limit_exceeded;
M2 -> lim(term_size(T, M2, W), 2 * W)
- end.
+ end;
+term_size(X, M, W) ->
+ lim(M, erts_debug:flat_size(X) * W).
lim(S, T) when is_number(S) andalso S > T -> S - T;
lim(_, _) -> limit_exceeded.
@@ -156,6 +156,8 @@ test_short_examples_exactly() ->
P = spawn(fun() -> receive die -> ok end end),
F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]),
P ! die,
+ R = make_ref(),
+ F([R], [R]),
ok.
test_term_limit() ->
@@ -163,8 +165,14 @@ test_term_limit() ->
S = <<"abc">>,
1 = term_size(S, 4, W),
limit_exceeded = term_size(S, 3, W),
- 62 = term_size([S, S], 100, W),
- 46 = term_size([S, [S]], 100, W),
+ case 100 - term_size([S, S], 100, W) of
+ 22 -> ok; %% 32 bit
+ 38 -> ok %% 64 bit
+ end,
+ case 100 - term_size([S, [S]], 100, W) of
+ 30 -> ok; %% ditto
+ 54 -> ok
+ end,
limit_exceeded = term_size([S, S], 6, W),
ok.