diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-26 18:18:43 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-26 18:18:43 +0100 |
commit | bfa541a0959f1fd9d9a4870fe4698c3e3c2c8098 (patch) | |
tree | 41fef27e2c3b1f8864938cbee9017928d0b7afc9 | |
parent | 8cf5fb077548133b141e24e165e667b5351d6d41 (diff) | |
download | rabbitmq-server-git-rabbitmq-stream-writer-dedupe.tar.gz |
Include publisher state managementrabbitmq-stream-writer-dedupe
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 116 |
1 files changed, 80 insertions, 36 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 59e2284960..425d2dbb23 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -27,7 +27,8 @@ -record(publisher, { publisher_id :: publisher_id(), stream :: stream(), - reference :: 'undefined' | publisher_reference() + reference :: 'undefined' | publisher_reference(), + leader :: pid() }). -record(consumer, { @@ -731,7 +732,6 @@ handle_frame_post_auth(Transport, #stream_connection{ PublisherId:8, ReferenceSize:16, Reference:ReferenceSize/binary, StreamSize:16, Stream:StreamSize/binary>>, Rest) -> - %% FIXME check if the publisher ID and reference do not already exist case rabbit_stream_utils:check_write_permitted( #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, @@ -743,14 +743,15 @@ handle_frame_post_auth(Transport, #stream_connection{ cluster_not_found -> response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), {Connection0, State, Rest}; - {_, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} -> + {ClusterLeader, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} -> {PublisherReference, RefIds1} = case Reference of <<"">> -> {undefined, RefIds0}; _ -> {Reference, RefIds0#{{Stream, Reference} => PublisherId}} end, Publisher = #publisher{publisher_id = PublisherId, stream = Stream, - reference = PublisherReference}, + reference = PublisherReference, + leader = ClusterLeader}, response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), {Connection1#stream_connection{publishers = Publishers0#{PublisherId => Publisher}, publisher_to_ids = RefIds1}, State, Rest} @@ -772,8 +773,9 @@ handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers, Connection1 = Connection0#stream_connection{ publishers = maps:remove(PublisherId, Publishers), publisher_to_ids = maps:remove({Stream, Ref}, PubToIds)}, + Connection2 = maybe_clean_connection_from_stream(Stream, Connection1), response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), - {Connection1, State, Rest}; + {Connection2, State, Rest}; _ -> response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), {Connection0, State, Rest} @@ -893,8 +895,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), {Connection, State, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions, - stream_leaders = StreamLeaders} = Connection, +handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned>>, Rest) -> case subscription_exists(StreamSubscriptions, SubscriptionId) of @@ -906,28 +907,19 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre Stream = Consumer#consumer.stream, #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream), - {Connection1, StreamSubscriptions1, StreamLeaders1} = + StreamSubscriptions1 = case length(SubscriptionsForThisStream1) of 0 -> - %% no more subscriptions for this stream - %% we unregister even though it could affect publishing if the stream is published to - %% from this connection and is deleted. - %% to mitigate this, we remove the stream from the leaders cache - %% this way the stream leader will be looked up in the next publish command - %% and registered to. - C = demonitor_stream(Stream, Connection), - {C, maps:remove(Stream, StreamSubscriptions), - maps:remove(Stream, StreamLeaders) - }; + % no more subscription for this stream + maps:remove(Stream, StreamSubscriptions); _ -> - {Connection, StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, StreamLeaders} + StreamSubscriptions#{Stream => SubscriptionsForThisStream1} end, + Connection1 = Connection#stream_connection{stream_subscriptions = StreamSubscriptions1}, Consumers1 = maps:remove(SubscriptionId, Consumers), + Connection2 = maybe_clean_connection_from_stream(Stream, Connection1), response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId), - {Connection1#stream_connection{ - stream_subscriptions = StreamSubscriptions1, - stream_leaders = StreamLeaders1 - }, State#stream_connection_state{consumers = Consumers1}, Rest} + {Connection2, State#stream_connection_state{consumers = Consumers1}, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, @@ -1213,21 +1205,49 @@ handle_frame_post_close(_Transport, Connection, State, Frame, Rest) -> rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]), {Connection, State, Rest}. -clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection, - #stream_connection_state{consumers = Consumers} = State) -> - case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of - {true, _} -> +clean_state_after_stream_deletion_or_failure(Stream, + #stream_connection{stream_subscriptions = StreamSubscriptions, + publishers = Publishers, + publisher_to_ids = PublisherToIds, + stream_leaders = Leaders} = C0, + #stream_connection_state{consumers = Consumers} = S0) -> + {SubscriptionsCleaned, C1, S1} = case stream_has_subscriptions(Stream, C0) of + true -> #{Stream := SubscriptionIds} = StreamSubscriptions, - {cleaned, Connection#stream_connection{ - stream_leaders = maps:remove(Stream, StreamLeaders), + {true, C0#stream_connection{ stream_subscriptions = maps:remove(Stream, StreamSubscriptions) - }, State#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}}; - {false, true} -> - {cleaned, Connection#stream_connection{ - stream_leaders = maps:remove(Stream, StreamLeaders) - }, State}; - {false, false} -> - {not_cleaned, Connection, State} + }, S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}}; + false -> + {false, C0, S0} + end, + {PublishersCleaned, C2, S2} = case stream_has_publishers(Stream, C1) of + true -> + {PurgedPubs, PurgedPubToIds} = maps:fold( + fun(PubId, #publisher{stream = S, reference = Ref}, {Pubs, PubToIds}) -> + case S of + Stream -> + {maps:remove(PubId, Pubs), maps:remove({Stream, Ref}, PubToIds)}; + _ -> + {Pubs, PubToIds} + end + end, {Publishers, PublisherToIds}, Publishers), + {true, C1#stream_connection{publishers = PurgedPubs, publisher_to_ids = PurgedPubToIds}, + S1}; + false -> + {false, C1, S1} + end, + {LeadersCleaned, Leaders1} = case Leaders of + #{Stream := _} -> + {true, maps:remove(Stream, Leaders)}; + _ -> + {false, Leaders} + end, + case SubscriptionsCleaned orelse PublishersCleaned orelse LeadersCleaned of + true -> + C3 = demonitor_stream(Stream, C2), + {cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2}; + false -> + {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2} end. lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) -> @@ -1247,6 +1267,16 @@ lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual lookup_leader_from_manager(VirtualHost, Stream) -> rabbit_stream_manager:lookup_leader(VirtualHost, Stream). +maybe_clean_connection_from_stream(Stream, #stream_connection{stream_leaders = Leaders} = Connection0) -> + Connection1 = + case {stream_has_publishers(Stream, Connection0), stream_has_subscriptions(Stream, Connection0)} of + {false, false} -> + demonitor_stream(Stream, Connection0); + _ -> + Connection0 + end, + Connection1#stream_connection{stream_leaders = maps:remove(Stream, Leaders)}. + maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) -> case lists:member(Stream, maps:values(Monitors)) of true -> @@ -1260,6 +1290,7 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) Monitors = maps:fold(fun(MonitorRef, Strm, Acc) -> case Strm of Stream -> + demonitor(MonitorRef, [flush]), Acc; _ -> maps:put(MonitorRef, Strm, Acc) @@ -1268,6 +1299,19 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) end, #{}, Monitors0), Connection#stream_connection{monitors = Monitors}. +stream_has_subscriptions(Stream, #stream_connection{stream_subscriptions = Subscriptions}) -> + case Subscriptions of + #{Stream := StreamSubscriptions} when length(StreamSubscriptions) > 0 -> + true; + _ -> + false + end. + +stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) -> + lists:any(fun(#publisher{stream = S}) -> + case S of Stream -> true; _ -> false end + end, maps:values(Publishers)). + demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) -> lists:foreach(fun(MonitorRef) -> demonitor(MonitorRef, [flush]) |