diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-04-20 16:49:01 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-04-20 16:49:01 +0200 |
commit | 39f8be929d0be0c7dd7c9301cd8138bc21c62d25 (patch) | |
tree | 8b42b9d007e2febe28db180522e56dd76e761d30 | |
parent | 0dc1501f7b73e5049387ed62a975e726a6a3e81d (diff) | |
download | rabbitmq-server-git-stream-plugin-blocks-ingress-on-disk-alarm.tar.gz |
Block stream publishing on disk alarmstream-plugin-blocks-ingress-on-disk-alarm
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 126 |
1 files changed, 114 insertions, 12 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 8678fb2aec..c2f28661d0 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -76,6 +76,7 @@ client_properties = #{} :: #{binary() => binary()}, monitors = #{} :: #{reference() => stream()}, stats_timer :: undefined | reference(), + resource_alarm :: boolean(), send_file_oct :: atomics:atomics_ref()}). % number of bytes sent with send_file (for metrics) -record(configuration, @@ -143,6 +144,7 @@ consumers_info/2, publishers_info/2, in_vhost/2]). +-export([resource_alarm/3]). start_link(KeepaliveSup, Transport, Ref, Opts) -> Pid = proc_lib:spawn_link(?MODULE, init, @@ -191,6 +193,7 @@ init([KeepaliveSup, authentication_state = none, connection_step = tcp_connected, frame_max = FrameMax, + resource_alarm = false, send_file_oct = SendFileOct}, State = #stream_connection_state{consumers = #{}, @@ -198,6 +201,8 @@ init([KeepaliveSup, data = none}, Transport:setopts(RealSocket, [{active, once}]), + rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), + listen_loop_pre_auth(Transport, Connection, State, @@ -214,6 +219,15 @@ init([KeepaliveSup, [Error, Reason]) end. +resource_alarm(ConnectionPid, disk, + {_WasAlarmSetForNode, + IsThereAnyAlarmsForSameResourceInTheCluster, _Node}) -> + ConnectionPid + ! {resource_alarm, IsThereAnyAlarmsForSameResourceInTheCluster}, + ok; +resource_alarm(_ConnectionPid, _Resource, _Alert) -> + ok. + socket_op(Sock, Fun) -> RealSocket = rabbit_net:unwrap_socket(Sock), case Fun(Sock) of @@ -225,6 +239,23 @@ socket_op(Sock, Fun) -> exit(normal) end. +should_unblock(#stream_connection{publishers = Publishers}, _) + when map_size(Publishers) == 0 -> + %% always unblock a connection without publishers + true; +should_unblock(#stream_connection{credits = Credits, + resource_alarm = ResourceAlarm}, + #configuration{credits_required_for_unblocking = + CreditsRequiredForUnblocking}) -> + case {ResourceAlarm, + has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking)} + of + {true, _} -> + false; + {false, EnoughCreditsToUnblock} -> + EnoughCreditsToUnblock + end. + init_credit(CreditReference, Credits) -> atomics:put(CreditReference, 1, Credits). @@ -280,6 +311,12 @@ listen_loop_pre_auth(Transport, {OK, Closed, Error, _Passive} = Transport:messages(), %% FIXME introduce timeout to complete the connection opening (after block should be enough) receive + {resource_alarm, IsThereAlarm} -> + listen_loop_pre_auth(Transport, + Connection#stream_connection{resource_alarm = + IsThereAlarm}, + State#stream_connection_state{blocked = true}, + Configuration); {OK, S, Data} -> #stream_connection{connection_step = ConnectionStep0} = Connection, {Connection1, State1} = @@ -356,7 +393,8 @@ augment_infos_with_user_provided_connection_name(Infos, Infos end. -close(Transport, S, #stream_connection_state{consumers = Consumers}) -> +close(Transport, S, + #stream_connection_state{consumers = Consumers}) -> [osiris_log:close(Log) || #consumer{segment = Log} <- maps:values(Consumers)], Transport:shutdown(S, write), @@ -364,6 +402,7 @@ close(Transport, S, #stream_connection_state{consumers = Consumers}) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, + name = ConnectionName, stream_subscriptions = StreamSubscriptions, credits = Credits, @@ -383,6 +422,42 @@ listen_loop_post_auth(Transport, Connection = ensure_stats_timer(Connection0), {OK, Closed, Error, _Passive} = Transport:messages(), receive + {resource_alarm, IsThereAlarm} -> + rabbit_log:debug("Connection ~p received resource alarm. Alarm " + "on? ~p", + [ConnectionName, IsThereAlarm]), + EnoughCreditsToUnblock = + has_enough_credits_to_unblock(Credits, + CreditsRequiredForUnblocking), + NewBlockedState = + case {IsThereAlarm, EnoughCreditsToUnblock} of + {true, _} -> + true; + {false, EnoughCredits} -> + not EnoughCredits + end, + rabbit_log:debug("Connection ~p had blocked status set to ~p, new " + "blocked status is now ~p", + [ConnectionName, Blocked, NewBlockedState]), + case {Blocked, NewBlockedState} of + {true, false} -> + Transport:setopts(S, [{active, once}]), + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + rabbit_log:debug("Unblocking connection ~p", + [ConnectionName]); + {false, true} -> + ok = rabbit_heartbeat:pause_monitor(Heartbeater), + rabbit_log:debug("Blocking connection ~p after resource alarm", + [ConnectionName]); + _ -> + ok + end, + listen_loop_post_auth(Transport, + Connection#stream_connection{resource_alarm = + IsThereAlarm}, + State#stream_connection_state{blocked = + NewBlockedState}, + Configuration); {OK, S, Data} -> {Connection1, State1} = handle_inbound_data_post_auth(Transport, @@ -406,8 +481,7 @@ listen_loop_post_auth(Transport, State2 = case Blocked of true -> - case has_enough_credits_to_unblock(Credits, - CreditsRequiredForUnblocking) + case should_unblock(Connection, Configuration) of true -> Transport:setopts(S, [{active, once}]), @@ -521,9 +595,7 @@ listen_loop_post_auth(Transport, State1 = case Blocked of true -> - case has_enough_credits_to_unblock(Credits, - CreditsRequiredForUnblocking) - of + case should_unblock(Connection, Configuration) of true -> Transport:setopts(S, [{active, once}]), ok = @@ -568,9 +640,7 @@ listen_loop_post_auth(Transport, State1 = case Blocked of true -> - case has_enough_credits_to_unblock(Credits, - CreditsRequiredForUnblocking) - of + case should_unblock(Connection, Configuration) of true -> Transport:setopts(S, [{active, once}]), ok = @@ -724,6 +794,13 @@ listen_loop_post_close(Transport, %% FIXME demonitor streams %% FIXME introduce timeout to complete the connection closing (after block should be enough) receive + {resource_alarm, IsThereAlarm} -> + handle_inbound_data_post_close(Transport, + Connection#stream_connection{resource_alarm + = + IsThereAlarm}, + State, + Configuration); {OK, S, Data} -> Transport:setopts(S, [{active, once}]), {Connection1, State1} = @@ -1188,9 +1265,32 @@ notify_auth_result(Username, [P || {_, V} = P <- EventProps, V =/= '']). handle_frame_post_auth(Transport, + #stream_connection{resource_alarm = true} = Connection0, + State, + <<?REQUEST:1, + ?COMMAND_DECLARE_PUBLISHER:15, + ?VERSION_1:16, + CorrelationId:32, + PublisherId:8, + ReferenceSize:16, + _Reference:ReferenceSize/binary, + StreamSize:16, + Stream:StreamSize/binary>>, + Rest) -> + rabbit_log:info("Cannot create publisher ~p on stream ~p, connection " + "is blocked because of resource alarm", + [PublisherId, Stream]), + response(Transport, + Connection0, + ?COMMAND_DECLARE_PUBLISHER, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + {Connection0, State, Rest}; +handle_frame_post_auth(Transport, #stream_connection{user = User, publishers = Publishers0, - publisher_to_ids = RefIds0} = + publisher_to_ids = RefIds0, + resource_alarm = false} = Connection0, State, <<?REQUEST:1, @@ -1517,9 +1617,11 @@ handle_frame_post_auth(Transport, "ion ~p", [SubscriptionId, Stream, OffsetSpec]), - CounterSpec = {{?MODULE, Stream, SubscriptionId, self()}, []}, + CounterSpec = + {{?MODULE, Stream, SubscriptionId, self()}, []}, {ok, Segment} = - osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec), + osiris:init_reader(LocalMemberPid, OffsetSpec, + CounterSpec), rabbit_log:info("Next offset for subscription ~p is ~p", [SubscriptionId, osiris_log:next_offset(Segment)]), |