summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-04-20 21:17:03 +0300
committerGitHub <noreply@github.com>2021-04-20 21:17:03 +0300
commitb5ed4e7ca1a3bb5bf1b9d720a7256ea79f2952ef (patch)
tree8428deba20558bd2cd21ea536d6262f2eaa396c9
parent0dc1501f7b73e5049387ed62a975e726a6a3e81d (diff)
parentd78e14ad3bbbc079f231a6812dd63e87195b0d8d (diff)
downloadrabbitmq-server-git-b5ed4e7ca1a3bb5bf1b9d720a7256ea79f2952ef.tar.gz
Merge pull request #2989 from Ayanda-D/channel-interceptor-amqp-errors
Allow AMQP error responses in channel interceptors
-rw-r--r--deps/rabbit/src/rabbit_channel_interceptor.erl14
-rw-r--r--deps/rabbit/test/channel_interceptor_SUITE.erl51
-rw-r--r--deps/rabbit/test/dummy_interceptor.erl8
3 files changed, 69 insertions, 4 deletions
diff --git a/deps/rabbit/src/rabbit_channel_interceptor.erl b/deps/rabbit/src/rabbit_channel_interceptor.erl
index 095bbd2583..38d4456549 100644
--- a/deps/rabbit/src/rabbit_channel_interceptor.erl
+++ b/deps/rabbit/src/rabbit_channel_interceptor.erl
@@ -29,7 +29,7 @@
-callback init(rabbit_channel:channel()) -> interceptor_state().
-callback intercept(original_method(), original_content(),
interceptor_state()) ->
- {processed_method(), processed_content()} |
+ {processed_method(), processed_content()} | rabbit_types:amqp_error() |
rabbit_misc:channel_or_connection_exit().
-callback applies_to() -> list(method_name()).
@@ -88,7 +88,9 @@ validate_response(Mod, M1, C1, R = {M2, C2}) ->
"content iff content is provided but "
"content in = ~p; content out = ~p",
[Mod, C1, C2])
- end.
+ end;
+validate_response(_Mod, _M1, _C1, AMQPError = #amqp_error{}) ->
+ internal_error(AMQPError).
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
@@ -98,6 +100,12 @@ validate_content(#content{}, #content{}) -> true;
validate_content(_, _) -> false.
%% keep dialyzer happy
--spec internal_error(string(), [any()]) -> no_return().
+-spec internal_error(rabbit_types:amqp_error()) ->
+ rabbit_misc:channel_or_connection_exit().
+internal_error(AMQPError = #amqp_error{}) ->
+ rabbit_misc:protocol_error(AMQPError).
+
+-spec internal_error(string(), [any()]) ->
+ rabbit_misc:channel_or_connection_exit().
internal_error(Format, Args) ->
rabbit_misc:protocol_error(internal_error, Format, Args).
diff --git a/deps/rabbit/test/channel_interceptor_SUITE.erl b/deps/rabbit/test/channel_interceptor_SUITE.erl
index 1f2126f57f..f0bfa48a63 100644
--- a/deps/rabbit/test/channel_interceptor_SUITE.erl
+++ b/deps/rabbit/test/channel_interceptor_SUITE.erl
@@ -9,6 +9,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@@ -21,6 +22,7 @@ groups() ->
[
{non_parallel_tests, [], [
register_interceptor,
+ register_interceptor_failing_with_amqp_error,
register_failing_interceptors
]}
].
@@ -94,6 +96,55 @@ register_interceptor1(Config, Interceptor) ->
check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>),
passed.
+register_interceptor_failing_with_amqp_error(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, register_interceptor_failing_with_amqp_error1,
+ [Config, dummy_interceptor]).
+
+register_interceptor_failing_with_amqp_error1(Config, Interceptor) ->
+ PredefinedChannels = rabbit_channel:list(),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0),
+
+ [ChannelProc] = rabbit_channel:list() -- PredefinedChannels,
+
+ [{interceptors, []}] = rabbit_channel:info(ChannelProc, [interceptors]),
+
+ ok = rabbit_registry:register(channel_interceptor,
+ <<"dummy interceptor">>,
+ Interceptor),
+ [{interceptors, [{Interceptor, undefined}]}] =
+ rabbit_channel:info(ChannelProc, [interceptors]),
+
+ Q1 = <<"succeeding-q">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = Q1}),
+
+ Q2 = <<"failing-q">>,
+ try
+ amqp_channel:call(Ch1, #'queue.declare'{queue = Q2})
+ catch
+ _:Reason ->
+ ?assertMatch(
+ {{shutdown, {_, _, <<"PRECONDITION_FAILED - operation not allowed">>}}, _},
+ Reason)
+ end,
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
+ [ChannelProc1] = rabbit_channel:list() -- PredefinedChannels,
+
+ ok = rabbit_registry:unregister(channel_interceptor,
+ <<"dummy interceptor">>),
+ [{interceptors, []}] = rabbit_channel:info(ChannelProc1, [interceptors]),
+
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = Q2}),
+
+ #'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q1}),
+ #'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q2}),
+
+ passed.
+
register_failing_interceptors(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).
diff --git a/deps/rabbit/test/dummy_interceptor.erl b/deps/rabbit/test/dummy_interceptor.erl
index 6d510a3073..d173e1474a 100644
--- a/deps/rabbit/test/dummy_interceptor.erl
+++ b/deps/rabbit/test/dummy_interceptor.erl
@@ -19,8 +19,14 @@ intercept(#'basic.publish'{} = Method, Content, _IState) ->
Content2 = Content#content{payload_fragments_rev = []},
{Method, Content2};
+%% Use 'queue.declare' to test #amqp_error{} handling
+intercept(#'queue.declare'{queue = <<"failing-q">>}, _Content, _IState) ->
+ rabbit_misc:amqp_error(
+ 'precondition_failed', "operation not allowed", [],
+ 'queue.declare');
+
intercept(Method, Content, _VHost) ->
{Method, Content}.
applies_to() ->
- ['basic.publish'].
+ ['basic.publish', 'queue.declare'].