diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-04-20 21:17:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-20 21:17:03 +0300 |
commit | b5ed4e7ca1a3bb5bf1b9d720a7256ea79f2952ef (patch) | |
tree | 8428deba20558bd2cd21ea536d6262f2eaa396c9 | |
parent | 0dc1501f7b73e5049387ed62a975e726a6a3e81d (diff) | |
parent | d78e14ad3bbbc079f231a6812dd63e87195b0d8d (diff) | |
download | rabbitmq-server-git-b5ed4e7ca1a3bb5bf1b9d720a7256ea79f2952ef.tar.gz |
Merge pull request #2989 from Ayanda-D/channel-interceptor-amqp-errors
Allow AMQP error responses in channel interceptors
-rw-r--r-- | deps/rabbit/src/rabbit_channel_interceptor.erl | 14 | ||||
-rw-r--r-- | deps/rabbit/test/channel_interceptor_SUITE.erl | 51 | ||||
-rw-r--r-- | deps/rabbit/test/dummy_interceptor.erl | 8 |
3 files changed, 69 insertions, 4 deletions
diff --git a/deps/rabbit/src/rabbit_channel_interceptor.erl b/deps/rabbit/src/rabbit_channel_interceptor.erl index 095bbd2583..38d4456549 100644 --- a/deps/rabbit/src/rabbit_channel_interceptor.erl +++ b/deps/rabbit/src/rabbit_channel_interceptor.erl @@ -29,7 +29,7 @@ -callback init(rabbit_channel:channel()) -> interceptor_state(). -callback intercept(original_method(), original_content(), interceptor_state()) -> - {processed_method(), processed_content()} | + {processed_method(), processed_content()} | rabbit_types:amqp_error() | rabbit_misc:channel_or_connection_exit(). -callback applies_to() -> list(method_name()). @@ -88,7 +88,9 @@ validate_response(Mod, M1, C1, R = {M2, C2}) -> "content iff content is provided but " "content in = ~p; content out = ~p", [Mod, C1, C2]) - end. + end; +validate_response(_Mod, _M1, _C1, AMQPError = #amqp_error{}) -> + internal_error(AMQPError). validate_method(M, M2) -> rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). @@ -98,6 +100,12 @@ validate_content(#content{}, #content{}) -> true; validate_content(_, _) -> false. %% keep dialyzer happy --spec internal_error(string(), [any()]) -> no_return(). +-spec internal_error(rabbit_types:amqp_error()) -> + rabbit_misc:channel_or_connection_exit(). +internal_error(AMQPError = #amqp_error{}) -> + rabbit_misc:protocol_error(AMQPError). + +-spec internal_error(string(), [any()]) -> + rabbit_misc:channel_or_connection_exit(). internal_error(Format, Args) -> rabbit_misc:protocol_error(internal_error, Format, Args). diff --git a/deps/rabbit/test/channel_interceptor_SUITE.erl b/deps/rabbit/test/channel_interceptor_SUITE.erl index 1f2126f57f..f0bfa48a63 100644 --- a/deps/rabbit/test/channel_interceptor_SUITE.erl +++ b/deps/rabbit/test/channel_interceptor_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). -compile(export_all). @@ -21,6 +22,7 @@ groups() -> [ {non_parallel_tests, [], [ register_interceptor, + register_interceptor_failing_with_amqp_error, register_failing_interceptors ]} ]. @@ -94,6 +96,55 @@ register_interceptor1(Config, Interceptor) -> check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>), passed. +register_interceptor_failing_with_amqp_error(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, register_interceptor_failing_with_amqp_error1, + [Config, dummy_interceptor]). + +register_interceptor_failing_with_amqp_error1(Config, Interceptor) -> + PredefinedChannels = rabbit_channel:list(), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0), + + [ChannelProc] = rabbit_channel:list() -- PredefinedChannels, + + [{interceptors, []}] = rabbit_channel:info(ChannelProc, [interceptors]), + + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor">>, + Interceptor), + [{interceptors, [{Interceptor, undefined}]}] = + rabbit_channel:info(ChannelProc, [interceptors]), + + Q1 = <<"succeeding-q">>, + #'queue.declare_ok'{} = + amqp_channel:call(Ch1, #'queue.declare'{queue = Q1}), + + Q2 = <<"failing-q">>, + try + amqp_channel:call(Ch1, #'queue.declare'{queue = Q2}) + catch + _:Reason -> + ?assertMatch( + {{shutdown, {_, _, <<"PRECONDITION_FAILED - operation not allowed">>}}, _}, + Reason) + end, + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + [ChannelProc1] = rabbit_channel:list() -- PredefinedChannels, + + ok = rabbit_registry:unregister(channel_interceptor, + <<"dummy interceptor">>), + [{interceptors, []}] = rabbit_channel:info(ChannelProc1, [interceptors]), + + #'queue.declare_ok'{} = + amqp_channel:call(Ch2, #'queue.declare'{queue = Q2}), + + #'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q1}), + #'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q2}), + + passed. + register_failing_interceptors(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]). diff --git a/deps/rabbit/test/dummy_interceptor.erl b/deps/rabbit/test/dummy_interceptor.erl index 6d510a3073..d173e1474a 100644 --- a/deps/rabbit/test/dummy_interceptor.erl +++ b/deps/rabbit/test/dummy_interceptor.erl @@ -19,8 +19,14 @@ intercept(#'basic.publish'{} = Method, Content, _IState) -> Content2 = Content#content{payload_fragments_rev = []}, {Method, Content2}; +%% Use 'queue.declare' to test #amqp_error{} handling +intercept(#'queue.declare'{queue = <<"failing-q">>}, _Content, _IState) -> + rabbit_misc:amqp_error( + 'precondition_failed', "operation not allowed", [], + 'queue.declare'); + intercept(Method, Content, _VHost) -> {Method, Content}. applies_to() -> - ['basic.publish']. + ['basic.publish', 'queue.declare']. |