diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-07-05 13:23:37 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-07-05 13:23:37 +0100 |
| commit | 055b2a496e32f959fc8ed06fa0f8f60f6dcfed3f (patch) | |
| tree | 9a99ed6c4ff42b48e167a9051f4c4e82cb815d32 /src | |
| parent | 04f8ad3fca3c1cbbe8d0bf2c5c29c35b3b0c7f43 (diff) | |
| parent | f5a71ecfeec07e99fb1baa5e15de3118d72b6f0a (diff) | |
| download | rabbitmq-server-git-055b2a496e32f959fc8ed06fa0f8f60f6dcfed3f.tar.gz | |
Merged bug25659 into stable
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_connection_sup.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_intermediate_sup.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 22 |
4 files changed, 80 insertions, 34 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 31bc51b8b1..fedfe97a6e 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,11 +42,20 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + %% We need to get channels in the hierarchy here so they close + %% before the reader. But for 1.0 readers we can't start the real + %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - + %% so we add another supervisor into the hierarchy. + {ok, ChannelSup3Pid} = + supervisor2:start_child( + SupPid, + {channel_sup3, {rabbit_intermediate_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [SupPid, Collector, + [ChannelSup3Pid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl new file mode 100644 index 0000000000..1919d9d6ed --- /dev/null +++ b/src/rabbit_intermediate_sup.erl @@ -0,0 +1,39 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_intermediate_sup). + +-behaviour(supervisor2). + +-export([start_link/0]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d8db922961..d282dad064 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -56,7 +56,8 @@ %% Main interface -spec(init/0 :: () -> 'ok'). --spec(join_cluster/2 :: (node(), node_type()) -> 'ok'). +-spec(join_cluster/2 :: (node(), node_type()) + -> 'ok' | {'ok', 'already_member'}). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). @@ -164,23 +165,24 @@ join_cluster(DiscoveryNode, NodeType) -> {error, _} = E -> throw(E) end, case me_in_nodes(ClusterNodes) of - true -> e(already_clustered); - false -> ok - end, - - %% reset the node. this simplifies things and it will be needed in - %% this case - we're joining a new cluster with new nodes which - %% are not in synch with the current node. I also lifts the burden - %% of reseting the node from the user. - reset_gracefully(), - - %% Join the cluster - rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", - [ClusterNodes, NodeType]), - ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), - rabbit_node_monitor:notify_joined_cluster(), - - ok. + false -> + %% reset the node. this simplifies things and it will be needed in + %% this case - we're joining a new cluster with new nodes which + %% are not in synch with the current node. I also lifts the burden + %% of reseting the node from the user. + reset_gracefully(), + + %% Join the cluster + rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), + ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), + rabbit_node_monitor:notify_joined_cluster(), + ok; + true -> + rabbit_misc:local_info_msg("Already member of cluster: ~p~n", + [ClusterNodes]), + {ok, already_member} + end. %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no @@ -859,10 +861,6 @@ error_description(clustering_only_disc_node) -> error_description(resetting_only_disc_node) -> "You cannot reset a node when it is the only disc node in a cluster. " "Please convert another node of the cluster to a disc node first."; -error_description(already_clustered) -> - "You are already clustered with the nodes you have selected. If the " - "node you are trying to cluster with is not present in the current " - "node status, use 'update_cluster_nodes'."; error_description(not_clustered) -> "Non-clustered nodes can only be disc nodes."; error_description(cannot_connect_to_cluster) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 61fac0e268..3cf88d06b2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, @@ -103,19 +103,19 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, +start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -201,7 +201,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of @@ -240,7 +240,7 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], @@ -837,7 +837,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -847,7 +847,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Throttle1 = Throttle#throttle{conserve_resources = Conserve}, {ok, ChannelSupSupPid} = supervisor2:start_child( - ConnSupPid, + ChSup3Pid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( @@ -1048,9 +1048,9 @@ pack_for_1_0(#v1{parent = Parent, recv_len = RecvLen, pending_recv = PendingRecv, queue_collector = QueueCollector, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, start_heartbeat_fun = SHF, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, Buf, BufLen}. |
