summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-01-24 17:26:12 +0300
committerGitHub <noreply@github.com>2018-01-24 17:26:12 +0300
commitcef9d18ee171b1b15bb77dcf7f4793c3c043eece (patch)
treea68732bc9a84093a5d950b72082e7aa6f1b1b109
parentaffb941c94de205c304b8e4ea2dc3270896fe9bc (diff)
parent5570b18f5e26096927adffd0c6f2cbcc747ed346 (diff)
downloadrabbitmq-server-git-cef9d18ee171b1b15bb77dcf7f4793c3c043eece.tar.gz
Merge pull request #1477 from rabbitmq/rabbitmq-server-channel-interceptor-hang
Make sure rabbit_misc:pmap callbacks do not throw.
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--test/channel_interceptor_SUITE.erl14
-rw-r--r--test/failing_dummy_interceptor.erl27
3 files changed, 54 insertions, 7 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c671438ce8..1de367e70c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -361,13 +361,29 @@ emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
refresh_config_local() ->
rabbit_misc:upmap(
- fun (C) -> gen_server2:call(C, refresh_config, infinity) end,
+ fun (C) ->
+ try
+ gen_server2:call(C, refresh_config, infinity)
+ catch _:Reason ->
+ rabbit_log:error("Failed to refresh channel config "
+ "for channel ~p. Reason ~p",
+ [C, Reason])
+ end
+ end,
list_local()),
ok.
refresh_interceptors() ->
rabbit_misc:upmap(
- fun (C) -> gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) end,
+ fun (C) ->
+ try
+ gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
+ catch _:Reason ->
+ rabbit_log:error("Failed to refresh channel interceptors "
+ "for channel ~p. Reason ~p",
+ [C, Reason])
+ end
+ end,
list_local()),
ok.
diff --git a/test/channel_interceptor_SUITE.erl b/test/channel_interceptor_SUITE.erl
index 0e4948ea3c..4081086f4f 100644
--- a/test/channel_interceptor_SUITE.erl
+++ b/test/channel_interceptor_SUITE.erl
@@ -29,7 +29,8 @@ all() ->
groups() ->
[
{non_parallel_tests, [], [
- register_interceptor
+ register_interceptor,
+ register_failing_interceptors
]}
].
@@ -71,9 +72,9 @@ end_per_testcase(Testcase, Config) ->
register_interceptor(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, register_interceptor1, [Config]).
+ ?MODULE, register_interceptor1, [Config, dummy_interceptor]).
-register_interceptor1(Config) ->
+register_interceptor1(Config, Interceptor) ->
PredefinedChannels = rabbit_channel:list(),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0),
@@ -89,8 +90,8 @@ register_interceptor1(Config) ->
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor">>,
- dummy_interceptor),
- [{interceptors, [{dummy_interceptor, undefined}]}] =
+ Interceptor),
+ [{interceptors, [{Interceptor, undefined}]}] =
rabbit_channel:info(ChannelProc, [interceptors]),
check_send_receive(Ch1, QName, <<"bar">>, <<"">>),
@@ -102,6 +103,9 @@ register_interceptor1(Config) ->
check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>),
passed.
+register_failing_interceptors(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).
check_send_receive(Ch1, QName, Send, Receive) ->
amqp_channel:call(Ch1,
diff --git a/test/failing_dummy_interceptor.erl b/test/failing_dummy_interceptor.erl
new file mode 100644
index 0000000000..62669e7f1f
--- /dev/null
+++ b/test/failing_dummy_interceptor.erl
@@ -0,0 +1,27 @@
+-module(failing_dummy_interceptor).
+
+-behaviour(rabbit_channel_interceptor).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+
+-compile(export_all).
+
+init(_Ch) ->
+ timer:sleep(15500),
+ undefined.
+
+description() ->
+ [{description,
+ <<"Empties payload on publish">>}].
+
+intercept(#'basic.publish'{} = Method, Content, _IState) ->
+ Content2 = Content#content{payload_fragments_rev = []},
+ {Method, Content2};
+
+intercept(Method, Content, _VHost) ->
+ {Method, Content}.
+
+applies_to() ->
+ ['basic.publish'].