summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLuke Bakken <lbakken@pivotal.io>2018-03-13 09:00:50 -0700
committerMichael Klishin <michael@clojurewerkz.org>2018-03-16 09:25:15 +0300
commitfbddd66e964eaca136b8e3f147df7b9f3790fdce (patch)
tree33385af2d049c7a53094203e334b93a4a39e6efd /src
parent0e57c034e31fe7f5ad6f05f144afa38f24a0676d (diff)
downloadrabbitmq-server-git-fbddd66e964eaca136b8e3f147df7b9f3790fdce.tar.gz
Add special case in handle_other for normal TCP port exit
Handle noport at epmd monitor startup Handle EXIT from TCP port more gracefully Ensure that Parent pid is matched (cherry picked from commit e8d492b75e5c5f6c70f9ea8290a0c8a06362181a)
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_epmd_monitor.erl55
-rw-r--r--src/rabbit_reader.erl17
2 files changed, 48 insertions, 24 deletions
diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
index 9d8044e6ef..1a14a640d5 100644
--- a/src/rabbit_epmd_monitor.erl
+++ b/src/rabbit_epmd_monitor.erl
@@ -48,16 +48,26 @@
%% epmd" as a shutdown or uninstall step.
%% ----------------------------------------------------------------------------
-start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
{Me, Host} = rabbit_nodes:parts(node()),
Mod = net_kernel:epmd_module(),
- {port, Port, _Version} = Mod:port_please(Me, Host),
- {ok, ensure_timer(#state{mod = Mod,
- me = Me,
- host = Host,
- port = Port})}.
+ init_handle_port_please(Mod:port_please(Me, Host), Mod, Me, Host).
+
+init_handle_port_please(noport, Mod, Me, Host) ->
+ State = #state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = undefined},
+ {ok, ensure_timer(State)};
+init_handle_port_please({port, Port, _Version}, Mod, Me, Host) ->
+ State = #state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port},
+ {ok, ensure_timer(State)}.
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -65,9 +75,9 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info(check, State) ->
- check_epmd(State),
- {noreply, ensure_timer(State#state{timer = undefined})};
+handle_info(check, State0) ->
+ {ok, State1} = check_epmd(State0),
+ {noreply, ensure_timer(State1#state{timer = undefined})};
handle_info(_Info, State) ->
{noreply, State}.
@@ -83,15 +93,18 @@ code_change(_OldVsn, State, _Extra) ->
ensure_timer(State) ->
rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check).
-check_epmd(#state{mod = Mod,
- me = Me,
- host = Host,
- port = Port}) ->
- case Mod:port_please(Me, Host) of
- noport -> rabbit_log:warning(
- "epmd does not know us, re-registering ~s at port ~b~n",
- [Me, Port]),
- rabbit_nodes:ensure_epmd(),
- Mod:register_node(Me, Port);
- _ -> ok
- end.
+check_epmd(State = #state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port}) ->
+ Port1 = case Mod:port_please(Me, Host) of
+ noport ->
+ rabbit_log:warning("epmd does not know us, re-registering ~s at port ~b~n",
+ [Me, Port]),
+ Port;
+ {port, NewPort, _Version} ->
+ NewPort
+ end,
+ rabbit_nodes:ensure_epmd(),
+ Mod:register_node(Me, Port1),
+ {ok, State#state{port = Port1}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6e2ed2a889..1b281234a3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -547,9 +547,14 @@ handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
{_, State1} = channel_cleanup(ChPid, State),
maybe_close(control_throttle(State1));
+handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) ->
+ %% rabbitmq/rabbitmq-server#544
+ %% The connection port process has exited due to the TCP socket being closed.
+ %% Handle this case in the same manner as receiving {error, closed}
+ stop(closed, State);
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
+ Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]),
+ terminate(Msg, State),
%% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
@@ -734,7 +739,7 @@ wait_for_channel_termination(N, TimerRef,
wait_for_channel_termination(N-1, TimerRef, State1)
end;
{'EXIT', Sock, _Reason} ->
- [channel_cleanup(ChPid, State) || ChPid <- all_channels()],
+ clean_up_all_channels(State),
exit(normal);
cancel_wait ->
exit(channel_termination_timeout)
@@ -903,6 +908,12 @@ channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+clean_up_all_channels(State) ->
+ CleanupFun = fun(ChPid) ->
+ channel_cleanup(ChPid, State)
+ end,
+ lists:foreach(CleanupFun, all_channels()).
+
%%--------------------------------------------------------------------------
handle_frame(Type, 0, Payload,