summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-04-20 16:49:01 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-04-20 16:49:01 +0200
commit39f8be929d0be0c7dd7c9301cd8138bc21c62d25 (patch)
tree8b42b9d007e2febe28db180522e56dd76e761d30
parent0dc1501f7b73e5049387ed62a975e726a6a3e81d (diff)
downloadrabbitmq-server-git-stream-plugin-blocks-ingress-on-disk-alarm.tar.gz
Block stream publishing on disk alarmstream-plugin-blocks-ingress-on-disk-alarm
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl126
1 files changed, 114 insertions, 12 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 8678fb2aec..c2f28661d0 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -76,6 +76,7 @@
client_properties = #{} :: #{binary() => binary()},
monitors = #{} :: #{reference() => stream()},
stats_timer :: undefined | reference(),
+ resource_alarm :: boolean(),
send_file_oct ::
atomics:atomics_ref()}). % number of bytes sent with send_file (for metrics)
-record(configuration,
@@ -143,6 +144,7 @@
consumers_info/2,
publishers_info/2,
in_vhost/2]).
+-export([resource_alarm/3]).
start_link(KeepaliveSup, Transport, Ref, Opts) ->
Pid = proc_lib:spawn_link(?MODULE, init,
@@ -191,6 +193,7 @@ init([KeepaliveSup,
authentication_state = none,
connection_step = tcp_connected,
frame_max = FrameMax,
+ resource_alarm = false,
send_file_oct = SendFileOct},
State =
#stream_connection_state{consumers = #{},
@@ -198,6 +201,8 @@ init([KeepaliveSup,
data = none},
Transport:setopts(RealSocket, [{active, once}]),
+ rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
+
listen_loop_pre_auth(Transport,
Connection,
State,
@@ -214,6 +219,15 @@ init([KeepaliveSup,
[Error, Reason])
end.
+resource_alarm(ConnectionPid, disk,
+ {_WasAlarmSetForNode,
+ IsThereAnyAlarmsForSameResourceInTheCluster, _Node}) ->
+ ConnectionPid
+ ! {resource_alarm, IsThereAnyAlarmsForSameResourceInTheCluster},
+ ok;
+resource_alarm(_ConnectionPid, _Resource, _Alert) ->
+ ok.
+
socket_op(Sock, Fun) ->
RealSocket = rabbit_net:unwrap_socket(Sock),
case Fun(Sock) of
@@ -225,6 +239,23 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
+should_unblock(#stream_connection{publishers = Publishers}, _)
+ when map_size(Publishers) == 0 ->
+ %% always unblock a connection without publishers
+ true;
+should_unblock(#stream_connection{credits = Credits,
+ resource_alarm = ResourceAlarm},
+ #configuration{credits_required_for_unblocking =
+ CreditsRequiredForUnblocking}) ->
+ case {ResourceAlarm,
+ has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking)}
+ of
+ {true, _} ->
+ false;
+ {false, EnoughCreditsToUnblock} ->
+ EnoughCreditsToUnblock
+ end.
+
init_credit(CreditReference, Credits) ->
atomics:put(CreditReference, 1, Credits).
@@ -280,6 +311,12 @@ listen_loop_pre_auth(Transport,
{OK, Closed, Error, _Passive} = Transport:messages(),
%% FIXME introduce timeout to complete the connection opening (after block should be enough)
receive
+ {resource_alarm, IsThereAlarm} ->
+ listen_loop_pre_auth(Transport,
+ Connection#stream_connection{resource_alarm =
+ IsThereAlarm},
+ State#stream_connection_state{blocked = true},
+ Configuration);
{OK, S, Data} ->
#stream_connection{connection_step = ConnectionStep0} = Connection,
{Connection1, State1} =
@@ -356,7 +393,8 @@ augment_infos_with_user_provided_connection_name(Infos,
Infos
end.
-close(Transport, S, #stream_connection_state{consumers = Consumers}) ->
+close(Transport, S,
+ #stream_connection_state{consumers = Consumers}) ->
[osiris_log:close(Log)
|| #consumer{segment = Log} <- maps:values(Consumers)],
Transport:shutdown(S, write),
@@ -364,6 +402,7 @@ close(Transport, S, #stream_connection_state{consumers = Consumers}) ->
listen_loop_post_auth(Transport,
#stream_connection{socket = S,
+ name = ConnectionName,
stream_subscriptions =
StreamSubscriptions,
credits = Credits,
@@ -383,6 +422,42 @@ listen_loop_post_auth(Transport,
Connection = ensure_stats_timer(Connection0),
{OK, Closed, Error, _Passive} = Transport:messages(),
receive
+ {resource_alarm, IsThereAlarm} ->
+ rabbit_log:debug("Connection ~p received resource alarm. Alarm "
+ "on? ~p",
+ [ConnectionName, IsThereAlarm]),
+ EnoughCreditsToUnblock =
+ has_enough_credits_to_unblock(Credits,
+ CreditsRequiredForUnblocking),
+ NewBlockedState =
+ case {IsThereAlarm, EnoughCreditsToUnblock} of
+ {true, _} ->
+ true;
+ {false, EnoughCredits} ->
+ not EnoughCredits
+ end,
+ rabbit_log:debug("Connection ~p had blocked status set to ~p, new "
+ "blocked status is now ~p",
+ [ConnectionName, Blocked, NewBlockedState]),
+ case {Blocked, NewBlockedState} of
+ {true, false} ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ rabbit_log:debug("Unblocking connection ~p",
+ [ConnectionName]);
+ {false, true} ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ rabbit_log:debug("Blocking connection ~p after resource alarm",
+ [ConnectionName]);
+ _ ->
+ ok
+ end,
+ listen_loop_post_auth(Transport,
+ Connection#stream_connection{resource_alarm =
+ IsThereAlarm},
+ State#stream_connection_state{blocked =
+ NewBlockedState},
+ Configuration);
{OK, S, Data} ->
{Connection1, State1} =
handle_inbound_data_post_auth(Transport,
@@ -406,8 +481,7 @@ listen_loop_post_auth(Transport,
State2 =
case Blocked of
true ->
- case has_enough_credits_to_unblock(Credits,
- CreditsRequiredForUnblocking)
+ case should_unblock(Connection, Configuration)
of
true ->
Transport:setopts(S, [{active, once}]),
@@ -521,9 +595,7 @@ listen_loop_post_auth(Transport,
State1 =
case Blocked of
true ->
- case has_enough_credits_to_unblock(Credits,
- CreditsRequiredForUnblocking)
- of
+ case should_unblock(Connection, Configuration) of
true ->
Transport:setopts(S, [{active, once}]),
ok =
@@ -568,9 +640,7 @@ listen_loop_post_auth(Transport,
State1 =
case Blocked of
true ->
- case has_enough_credits_to_unblock(Credits,
- CreditsRequiredForUnblocking)
- of
+ case should_unblock(Connection, Configuration) of
true ->
Transport:setopts(S, [{active, once}]),
ok =
@@ -724,6 +794,13 @@ listen_loop_post_close(Transport,
%% FIXME demonitor streams
%% FIXME introduce timeout to complete the connection closing (after block should be enough)
receive
+ {resource_alarm, IsThereAlarm} ->
+ handle_inbound_data_post_close(Transport,
+ Connection#stream_connection{resource_alarm
+ =
+ IsThereAlarm},
+ State,
+ Configuration);
{OK, S, Data} ->
Transport:setopts(S, [{active, once}]),
{Connection1, State1} =
@@ -1188,9 +1265,32 @@ notify_auth_result(Username,
[P || {_, V} = P <- EventProps, V =/= '']).
handle_frame_post_auth(Transport,
+ #stream_connection{resource_alarm = true} = Connection0,
+ State,
+ <<?REQUEST:1,
+ ?COMMAND_DECLARE_PUBLISHER:15,
+ ?VERSION_1:16,
+ CorrelationId:32,
+ PublisherId:8,
+ ReferenceSize:16,
+ _Reference:ReferenceSize/binary,
+ StreamSize:16,
+ Stream:StreamSize/binary>>,
+ Rest) ->
+ rabbit_log:info("Cannot create publisher ~p on stream ~p, connection "
+ "is blocked because of resource alarm",
+ [PublisherId, Stream]),
+ response(Transport,
+ Connection0,
+ ?COMMAND_DECLARE_PUBLISHER,
+ CorrelationId,
+ ?RESPONSE_CODE_PRECONDITION_FAILED),
+ {Connection0, State, Rest};
+handle_frame_post_auth(Transport,
#stream_connection{user = User,
publishers = Publishers0,
- publisher_to_ids = RefIds0} =
+ publisher_to_ids = RefIds0,
+ resource_alarm = false} =
Connection0,
State,
<<?REQUEST:1,
@@ -1517,9 +1617,11 @@ handle_frame_post_auth(Transport,
"ion ~p",
[SubscriptionId, Stream,
OffsetSpec]),
- CounterSpec = {{?MODULE, Stream, SubscriptionId, self()}, []},
+ CounterSpec =
+ {{?MODULE, Stream, SubscriptionId, self()}, []},
{ok, Segment} =
- osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec),
+ osiris:init_reader(LocalMemberPid, OffsetSpec,
+ CounterSpec),
rabbit_log:info("Next offset for subscription ~p is ~p",
[SubscriptionId,
osiris_log:next_offset(Segment)]),