diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-06-27 16:19:50 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-06-27 16:19:50 +0100 |
| commit | fabe615f03ce12cdfa7ddf1a8ed0a5eb1c23ce32 (patch) | |
| tree | 8581ea0b1b4cc39c94c75114779e2b07cec4840d /src | |
| parent | 32d6e55012f7db98b7a40ee06e943e063ddd49c7 (diff) | |
| parent | 96682191f028615959994aca91b2d0dd73591b95 (diff) | |
| download | rabbitmq-server-git-fabe615f03ce12cdfa7ddf1a8ed0a5eb1c23ce32.tar.gz | |
Merge in stable
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_autoheal.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_error_logger_file_h.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_sasl_report_file_h.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_ssl.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 2 | ||||
| -rw-r--r-- | src/truncate.erl | 20 |
13 files changed, 132 insertions, 62 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c2d7e29d6b..29e38c1f5f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -794,9 +794,25 @@ config_files() -> end, case init:get_argument(config) of {ok, Files} -> [Abs(File) || [File] <- Files]; - error -> case os:getenv("RABBITMQ_CONFIG_FILE") of - false -> []; - File -> [Abs(File) ++ " (not found)"] + error -> case config_setting() of + none -> []; + File -> [Abs(File) ++ " (not found)"] + end + end. + +%% This is a pain. We want to know where the config file is. But we +%% can't specify it on the command line if it is missing or the VM +%% will fail to start, so we need to find it by some mechanism other +%% than init:get_arguments/0. We can look at the environment variable +%% which is responsible for setting it... but that doesn't work for a +%% Windows service since the variable can change and the service not +%% be reinstalled, so in that case we add a magic application env. +config_setting() -> + case application:get_env(rabbit, windows_service_config) of + {ok, File1} -> File1; + undefined -> case os:getenv("RABBITMQ_CONFIG_FILE") of + false -> none; + File2 -> File2 end end. diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 826bfc45d4..c5237d346d 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -118,6 +118,7 @@ node_down(Node, _State) -> handle_msg({request_start, Node}, not_healing, Partitions) -> rabbit_log:info("Autoheal request received from ~p~n", [Node]), + rabbit_node_monitor:ping_all(), case rabbit_node_monitor:all_rabbit_nodes_up() of false -> not_healing; true -> AllPartitions = all_partitions(Partitions), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eb9ed4ed07..74f9cacf76 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -187,7 +187,7 @@ force_event_refresh(Ref) -> %%--------------------------------------------------------------------------- -init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost, +init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ?store_proc_name({ConnName, Channel}), @@ -195,7 +195,7 @@ init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost, State = #ch{state = starting, protocol = Protocol, channel = Channel, - reader_pid = Foo, + reader_pid = ReaderPid, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, @@ -894,7 +894,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - test(State, ExchangeName), + check_not_default_exchange(ExchangeName), + check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -1118,7 +1119,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, limiter = Limiter}) -> - State1 = test2(State, Msgs), + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), Rev = fun (X) -> lists:reverse(lists:sort(X)) end, lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1); ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter) @@ -1164,13 +1165,6 @@ handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). -test2(State, Msgs) -> - rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs). - -test(State, ExchangeName) -> - check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, State). - %%---------------------------------------------------------------------------- %% We get the queue process to send the consume_ok on our behalf. This diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index fbf13a90c8..031a04f0a1 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -107,8 +107,8 @@ init([Limit]) -> {stop, unsupported_platform} end. -handle_call(get_disk_free_limit, _From, State) -> - {reply, interpret_limit(State#state.limit), State}; +handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) -> + {reply, Limit, State}; handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; @@ -153,29 +153,29 @@ code_change(_OldVsn, State, _Extra) -> % the partition / drive containing this directory will be monitored dir() -> rabbit_mnesia:dir(). -set_disk_limits(State, Limit) -> +set_disk_limits(State, Limit0) -> + Limit = interpret_limit(Limit0), State1 = State#state { limit = Limit }, rabbit_log:info("Disk free limit set to ~pMB~n", - [trunc(interpret_limit(Limit) / 1000000)]), + [trunc(Limit / 1000000)]), internal_update(State1). internal_update(State = #state { limit = Limit, dir = Dir, alarmed = Alarmed}) -> - CurrentFreeBytes = get_disk_free(Dir), - LimitBytes = interpret_limit(Limit), - NewAlarmed = CurrentFreeBytes < LimitBytes, + CurrentFree = get_disk_free(Dir), + NewAlarmed = CurrentFree < Limit, case {Alarmed, NewAlarmed} of {false, true} -> - emit_update_info("insufficient", CurrentFreeBytes, LimitBytes), + emit_update_info("insufficient", CurrentFree, Limit), rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> - emit_update_info("sufficient", CurrentFreeBytes, LimitBytes), + emit_update_info("sufficient", CurrentFree, Limit), rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok end, - State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}. + State #state {alarmed = NewAlarmed, actual = CurrentFree}. get_disk_free(Dir) -> get_disk_free(Dir, os:type()). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 993f56f92f..353da0a7e2 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -27,6 +27,8 @@ -export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). +-import(rabbit_error_logger_file_h, [safe_handle_event/3]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -65,10 +67,13 @@ code_change(_OldVsn, State, _Extra) -> handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({Kind, _Gleader, {_Pid, Format, Data}}, State) -> +handle_event(Event, State) -> + safe_handle_event(fun handle_event0/2, Event, State). + +handle_event0({Kind, _Gleader, {_Pid, Format, Data}}, State) -> ok = publish(Kind, Format, Data, State), {ok, State}; -handle_event(_Event, State) -> +handle_event0(_Event, State) -> {ok, State}. handle_info(_Info, State) -> diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 16ab6d3a0e..be84273904 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -22,6 +22,8 @@ -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). +-export([safe_handle_event/3]). + %% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h %% module because the original's init/1 does not match properly %% with the result of closing the old handler when swapping handlers. @@ -77,8 +79,21 @@ init_file(File, PrevHandler) -> Error -> Error end. +handle_event(Event, State) -> + safe_handle_event(fun handle_event0/2, Event, State). + +safe_handle_event(HandleEvent, Event, State) -> + try + HandleEvent(Event, State) + catch + _:Error -> + io:format("Event crashed log handler:~n~P~n~P~n", + [Event, 30, Error, 30]), + {ok, State} + end. + %% filter out "application: foo; exited: stopped; type: temporary" -handle_event({info_report, _, {_, std_info, _}}, State) -> +handle_event0({info_report, _, {_, std_info, _}}, State) -> {ok, State}; %% When a node restarts quickly it is possible the rest of the cluster %% will not have had the chance to remove its queues from @@ -88,7 +103,7 @@ handle_event({info_report, _, {_, std_info, _}}, State) -> %% logs an event for every one of those messages; in extremis this can %% bring the server to its knees just logging "Discarding..." %% again and again. So just log the first one, then go silent. -handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, +handle_event0(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, State) -> case get(discarding_message_seen) of true -> {ok, State}; @@ -96,10 +111,10 @@ handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, error_logger_file_h:handle_event(t(Event), State) end; %% Clear this state if we log anything else (but not a progress report). -handle_event(Event = {info_msg, _, _}, State) -> +handle_event0(Event = {info_msg, _, _}, State) -> erase(discarding_message_seen), error_logger_file_h:handle_event(t(Event), State); -handle_event(Event, State) -> +handle_event0(Event, State) -> error_logger_file_h:handle_event(t(Event), State). handle_info(Info, State) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9082dbd353..0791bbe23d 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -16,8 +16,8 @@ -module(rabbit_networking). --export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, - stop_tcp_listener/1, on_node_down/1, active_listeners/0, +-export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2, + on_node_down/1, active_listeners/0, node_listeners/1, register_connection/1, unregister_connection/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, @@ -60,10 +60,10 @@ -type(label() :: string()). -spec(start/0 :: () -> 'ok'). +-spec(killall/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(start_ssl_listener/2 :: (listener_config(), rabbit_types:infos()) -> 'ok'). --spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(register_connection/1 :: (pid()) -> ok). @@ -145,6 +145,25 @@ start() -> rabbit_sup:start_supervisor_child( [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]). +%% We are going to stop for pause-minority, so we are already +%% compromised; anything we confirm from now on is not going to be +%% remembered after we come back. Since rabbit:stop/0 may take a while +%% to gracefully shut down, we should stop talking to the outside +%% world *immediately*. +killall() -> + %% Stop ASAP + kill_connections(), + {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners), + {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners), + [stop_listener(L) || L <- TCPListeners ++ SSLListeners], + %% In case anything reconnected while we were stopping listeners + kill_connections(), + ok. + +kill_connections() -> + Conns = connections_local() ++ rabbit_direct:list_local(), + [exit(P, kill) || P <- Conns]. + ensure_ssl() -> {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), ok = app_utils:start_applications(SslAppsConfig), @@ -245,12 +264,12 @@ start_listener0(Address, Protocol, Label, OnConnect) -> {rabbit_misc:ntoa(IPAddress), Port}}) end. -stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Address) || +stop_listener(Listener) -> + [stop_listener0(Address) || Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family}) -> +stop_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 1496147848..22b0c28087 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -31,7 +31,7 @@ code_change/3]). %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/1]). +-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -63,6 +63,7 @@ -spec(all_rabbit_nodes_up/0 :: () -> boolean()). -spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). +-spec(ping_all/0 :: () -> 'ok'). -endif. @@ -301,12 +302,11 @@ handle_info(ping_nodes, State) -> %% to ping the nodes that are up, after all. State1 = State#state{down_ping_timer = undefined}, Self = self(), - %% all_nodes_up() both pings all the nodes and tells us if we need to again. - %% %% We ping in a separate process since in a partition it might %% take some noticeable length of time and we don't want to block %% the node monitor for that long. spawn_link(fun () -> + ping_all(), case all_nodes_up() of true -> ok; false -> Self ! ping_again @@ -361,10 +361,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), - Nodes = rabbit_mnesia:cluster_nodes(all), run_outside_applications(fun () -> + rabbit_networking:killall(), rabbit:stop(), - wait_for_cluster_recovery(Nodes) + wait_for_cluster_recovery() end), ok. @@ -381,11 +381,12 @@ run_outside_applications(Fun) -> end end). -wait_for_cluster_recovery(Nodes) -> +wait_for_cluster_recovery() -> + ping_all(), case majority() of true -> rabbit:start(); false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), - wait_for_cluster_recovery(Nodes) + wait_for_cluster_recovery() end. handle_dead_rabbit(Node, State = #state{partitions = Partitions, @@ -453,6 +454,11 @@ del_node(Node, Nodes) -> Nodes -- [Node]. %% functions here. "rabbit" in a function's name implies we test if %% the rabbit application is up, not just the node. +%% As we use these functions to decide what to do in pause_minority +%% state, they *must* be fast, even in the case where TCP connections +%% are timing out. So that means we should be careful about whether we +%% connect to nodes which are currently disconnected. + majority() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_nodes(Nodes)) / length(Nodes) > 0.5. @@ -465,9 +471,14 @@ all_rabbit_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). -alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. +alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). alive_rabbit_nodes(Nodes) -> [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. + +%% This one is allowed to connect! +ping_all() -> + [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)], + ok. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 4cc9cd12f1..6a6a4ee680 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -90,9 +90,9 @@ dist_port_set_check() -> {none, none} -> ok; _ -> rabbit_misc:quit(?DIST_PORT_CONFIGURED) end; + {ok, _} -> + ok; {error, _} -> - %% TODO can we present errors more nicely here - %% than after -config has failed? ok end end. diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 4881210dc5..2dd16702b6 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -22,6 +22,8 @@ -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). +-import(rabbit_error_logger_file_h, [safe_handle_event/3]). + %% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h %% module because the original's init/1 does not match properly %% with the result of closing the old handler when swapping handlers. @@ -67,6 +69,9 @@ init_file({File, Type}) -> end. handle_event(Event, State) -> + safe_handle_event(fun handle_event0/2, Event, State). + +handle_event0(Event, State) -> sasl_report_file_h:handle_event( truncate:log_event(Event, ?LOG_TRUNC), State). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index e289489697..bd5dcf070b 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -104,15 +104,11 @@ peer_cert_auth_name(common_name, Cert) -> auth_config_sane() -> {ok, Opts} = application:get_env(rabbit, ssl_options), - case {proplists:get_value(fail_if_no_peer_cert, Opts), - proplists:get_value(verify, Opts)} of - {true, verify_peer} -> - true; - {F, V} -> - rabbit_log:warning("SSL certificate authentication disabled, " - "fail_if_no_peer_cert=~p; " - "verify=~p~n", [F, V]), - false + case proplists:get_value(verify, Opts) of + verify_peer -> true; + V -> rabbit_log:warning("SSL certificate authentication " + "disabled, verify=~p~n", [V]), + false end. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 48bbf64a95..6fe65c12a2 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -86,7 +86,7 @@ mnesia_memory() -> case mnesia:system_info(is_running) of yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) || Tab <- mnesia:system_info(tables)]); - no -> 0 + _ -> 0 end. ets_memory(Name) -> diff --git a/src/truncate.erl b/src/truncate.erl index 1c8332590a..820af1bf86 100644 --- a/src/truncate.erl +++ b/src/truncate.erl @@ -102,17 +102,17 @@ term_limit(Thing, Max) -> term_size(B, M, _W) when is_bitstring(B) -> lim(M, size(B)); term_size(A, M, W) when is_atom(A) -> lim(M, 2 * W); term_size(N, M, W) when is_number(N) -> lim(M, 2 * W); -term_size(F, M, W) when is_function(F) -> lim(M, erts_debug:flat_size(F) * W); -term_size(P, M, W) when is_pid(P) -> lim(M, erts_debug:flat_size(P) * W); term_size(T, M, W) when is_tuple(T) -> tuple_term_size( T, M, 1, tuple_size(T), W); -term_size([], M, _W) -> +term_size([], M, _W) -> M; term_size([H|T], M, W) -> case term_size(H, M, W) of limit_exceeded -> limit_exceeded; M2 -> lim(term_size(T, M2, W), 2 * W) - end. + end; +term_size(X, M, W) -> + lim(M, erts_debug:flat_size(X) * W). lim(S, T) when is_number(S) andalso S > T -> S - T; lim(_, _) -> limit_exceeded. @@ -156,6 +156,8 @@ test_short_examples_exactly() -> P = spawn(fun() -> receive die -> ok end end), F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]), P ! die, + R = make_ref(), + F([R], [R]), ok. test_term_limit() -> @@ -163,8 +165,14 @@ test_term_limit() -> S = <<"abc">>, 1 = term_size(S, 4, W), limit_exceeded = term_size(S, 3, W), - 62 = term_size([S, S], 100, W), - 46 = term_size([S, [S]], 100, W), + case 100 - term_size([S, S], 100, W) of + 22 -> ok; %% 32 bit + 38 -> ok %% 64 bit + end, + case 100 - term_size([S, [S]], 100, W) of + 30 -> ok; %% ditto + 54 -> ok + end, limit_exceeded = term_size([S, S], 6, W), ok. |
