diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-05-06 09:15:16 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-05-06 09:15:16 +0200 |
| commit | 2d19e859252c7e143c508aa147269e987c2c7f16 (patch) | |
| tree | e6134adc78f2a41dbca0e416677e9628c37c6acb | |
| parent | b61a79b9ffa20fadc8e2ef52403f0e1a6972c7c7 (diff) | |
| download | rabbitmq-server-git-2d19e859252c7e143c508aa147269e987c2c7f16.tar.gz | |
Use stream instead of target
| -rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 10 | ||||
| -rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 8 | ||||
| -rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 166 | ||||
| -rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl | 54 |
4 files changed, 119 insertions, 119 deletions
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index b6fcb0f6ab..7e46a6f87c 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -13,17 +13,17 @@ -define(COMMAND_OPEN, 12). -define(COMMAND_CLOSE, 13). -define(COMMAND_HEARTBEAT, 14). --define(COMMAND_CREATE_TARGET, 998). --define(COMMAND_DELETE_TARGET, 999). +-define(COMMAND_CREATE_STREAM, 998). +-define(COMMAND_DELETE_STREAM, 999). -define(VERSION_0, 0). -define(RESPONSE_CODE_OK, 0). --define(RESPONSE_CODE_TARGET_DOES_NOT_EXIST, 1). +-define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 1). -define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 2). -define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 3). --define(RESPONSE_CODE_TARGET_ALREADY_EXISTS, 4). --define(RESPONSE_CODE_TARGET_DELETED, 5). +-define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 4). +-define(RESPONSE_CODE_STREAM_DELETED, 5). -define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 6). -define(RESPONSE_AUTHENTICATION_FAILURE, 7). -define(RESPONSE_SASL_ERROR, 8). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index b6ecb342c7..6c933fa676 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -101,8 +101,8 @@ register() -> unregister() -> gen_server:call(?MODULE, {unregister, self()}). -lookup(Target) -> - gen_server:call(?MODULE, {lookup, Target}). +lookup(Stream) -> + gen_server:call(?MODULE, {lookup, Stream}). replicas_for_current_node() -> rabbit_mnesia:cluster_nodes(all) -- [node()]. @@ -167,8 +167,8 @@ handle_call({unregister, Pid}, _From, #state{listeners = Listeners, monitors = M maps:remove(Pid, Monitors) end, {reply, ok, State#state{listeners = lists:delete(Pid, Listeners), monitors = Monitors1}}; -handle_call({lookup, Target}, _From, State) -> - Res = case read(Target) of +handle_call({lookup, Stream}, _From, State) -> + Res = case read(Stream) of [] -> cluster_not_found; [#?MODULE{leader_pid = LeaderPid}] -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 0d2fd3f85c..014c3c2031 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -7,7 +7,7 @@ name, helper_sup, listen_socket, socket, clusters, data, consumers, - target_subscriptions, credits, + stream_subscriptions, credits, blocked, authentication_state, user, virtual_host, connection_step, % tcp_connected, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done @@ -16,7 +16,7 @@ }). -record(consumer, { - socket, leader, offset, subscription_id, segment, credit, target + socket, leader, offset, subscription_id, segment, credit, stream }). -record(configuration, { @@ -53,7 +53,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, helper_sup = KeepaliveSup, socket = RealSocket, data = none, clusters = #{}, - consumers = #{}, target_subscriptions = #{}, + consumers = #{}, stream_subscriptions = #{}, blocked = false, credits = Credits, authentication_state = none, user = none, connection_step = tcp_connected, @@ -125,7 +125,7 @@ close(Transport, S) -> Transport:close(S). listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Consumers, - target_subscriptions = TargetSubscriptions, credits = Credits, blocked = Blocked, heartbeater = Heartbeater} = State, + stream_subscriptions = StreamSubscriptions, credits = Credits, blocked = Blocked, heartbeater = Heartbeater} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> {OK, Closed, Error} = Transport:messages(), receive @@ -160,13 +160,13 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Cons listen_loop_post_auth(Transport, State2, Configuration) end; {stream_manager, cluster_deleted, ClusterReference} -> - Target = list_to_binary(ClusterReference), - State1 = case clean_state_after_target_deletion(Target, State) of + Stream = list_to_binary(ClusterReference), + State1 = case clean_state_after_stream_deletion(Stream, State) of {cleaned, NewState} -> - TargetSize = byte_size(Target), - FrameSize = 2 + 2 + 2 + 2 + TargetSize, + StreamSize = byte_size(Stream), + FrameSize = 2 + 2 + 2 + 2 + StreamSize, Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, - ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]), + ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]), NewState; {not_cleaned, SameState} -> SameState @@ -194,17 +194,17 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Cons State end, listen_loop_post_auth(Transport, State1, Configuration); - {osiris_offset, TargetName, -1} -> - error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [TargetName, -1]), + {osiris_offset, StreamName, -1} -> + error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]), listen_loop_post_auth(Transport, State, Configuration); - {osiris_offset, TargetName, Offset} when Offset > -1 -> - State1 = case maps:get(TargetName, TargetSubscriptions, undefined) of + {osiris_offset, StreamName, Offset} when Offset > -1 -> + State1 = case maps:get(StreamName, StreamSubscriptions, undefined) of undefined -> - error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [TargetName]), + error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [StreamName]), State; [] -> - error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [TargetName]), - State#stream_connection{target_subscriptions = maps:remove(TargetName, TargetSubscriptions)}; + error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [StreamName]), + State#stream_connection{stream_subscriptions = maps:remove(StreamName, StreamSubscriptions)}; CorrelationIds when is_list(CorrelationIds) -> Consumers1 = lists:foldl(fun(CorrelationId, ConsumersAcc) -> #{CorrelationId := Consumer} = ConsumersAcc, @@ -311,7 +311,7 @@ generate_publishing_error_details(Acc, <<>>) -> Acc; generate_publishing_error_details(Acc, <<PublishingId:64, MessageSize:32, _Message:MessageSize/binary, Rest/binary>>) -> generate_publishing_error_details( - <<Acc/binary, PublishingId:64, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16>>, + <<Acc/binary, PublishingId:64, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16>>, Rest). handle_frame_pre_auth(Transport, #stream_connection{socket = S} = State, @@ -441,9 +441,9 @@ handle_frame_pre_auth(_Transport, State, Frame, Rest) -> handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits} = State, <<?COMMAND_PUBLISH:16, ?VERSION_0:16, - TargetSize:16, Target:TargetSize/binary, + StreamSize:16, Stream:StreamSize/binary, MessageCount:32, Messages/binary>>, Rest) -> - case lookup_cluster(Target, State) of + case lookup_cluster(Stream, State) of cluster_not_found -> FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount, Details = generate_publishing_error_details(<<>>, Messages), @@ -455,16 +455,16 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi sub_credits(Credits, MessageCount), {State1, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers = Consumers, target_subscriptions = TargetSubscriptions} = State, - <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, Offset:64/unsigned, Credit:16/signed>>, Rest) -> - case lookup_cluster(Target, State) of +handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers = Consumers, stream_subscriptions = StreamSubscriptions} = State, + <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, StreamSize:16, Stream:StreamSize/binary, Offset:64/unsigned, Credit:16/signed>>, Rest) -> + case lookup_cluster(Stream, State) of cluster_not_found -> - response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST), + response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), {State, Rest}; {ClusterLeader, State1} -> - % offset message uses a list for the target, so storing this in the state for easier retrieval - TargetKey = binary_to_list(Target), - case subscription_exists(TargetSubscriptions, SubscriptionId) of + % offset message uses a list for the stream, so storing this in the state for easier retrieval + StreamKey = binary_to_list(Stream), + case subscription_exists(StreamSubscriptions, SubscriptionId) of true -> response(Transport, State1, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS), {State1, Rest}; @@ -475,7 +475,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers leader = ClusterLeader, offset = Offset, subscription_id = SubscriptionId, socket = Socket, segment = Segment, credit = Credit, - target = TargetKey + stream = StreamKey }, error_logger:info_msg("registering consumer ~p in ~p~n", [ConsumerState, self()]), @@ -487,41 +487,41 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers ), Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}}, - TargetSubscriptions1 = - case TargetSubscriptions of - #{TargetKey := SubscriptionIds} -> - TargetSubscriptions#{TargetKey => [SubscriptionId] ++ SubscriptionIds}; + StreamSubscriptions1 = + case StreamSubscriptions of + #{StreamKey := SubscriptionIds} -> + StreamSubscriptions#{StreamKey => [SubscriptionId] ++ SubscriptionIds}; _ -> - TargetSubscriptions#{TargetKey => [SubscriptionId]} + StreamSubscriptions#{StreamKey => [SubscriptionId]} end, - {State1#stream_connection{consumers = Consumers1, target_subscriptions = TargetSubscriptions1}, Rest} + {State1#stream_connection{consumers = Consumers1, stream_subscriptions = StreamSubscriptions1}, Rest} end end; -handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers, target_subscriptions = TargetSubscriptions, clusters = Clusters} = State, +handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers, stream_subscriptions = StreamSubscriptions, clusters = Clusters} = State, <<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32>>, Rest) -> - case subscription_exists(TargetSubscriptions, SubscriptionId) of + case subscription_exists(StreamSubscriptions, SubscriptionId) of false -> response(Transport, State, ?COMMAND_UNSUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST), {State, Rest}; true -> #{SubscriptionId := Consumer} = Consumers, - Target = Consumer#consumer.target, - #{Target := SubscriptionsForThisTarget} = TargetSubscriptions, - SubscriptionsForThisTarget1 = lists:delete(SubscriptionId, SubscriptionsForThisTarget), - {TargetSubscriptions1, Clusters1} = - case length(SubscriptionsForThisTarget1) of + Stream = Consumer#consumer.stream, + #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, + SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream), + {StreamSubscriptions1, Clusters1} = + case length(SubscriptionsForThisStream1) of 0 -> - %% no more subscriptions for this target - {maps:remove(Target, TargetSubscriptions), - maps:remove(list_to_binary(Target), Clusters) + %% no more subscriptions for this stream + {maps:remove(Stream, StreamSubscriptions), + maps:remove(list_to_binary(Stream), Clusters) }; _ -> - {TargetSubscriptions#{Target => SubscriptionsForThisTarget1}, Clusters} + {StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, Clusters} end, Consumers1 = maps:remove(SubscriptionId, Consumers), response_ok(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId), {State#stream_connection{consumers = Consumers1, - target_subscriptions = TargetSubscriptions1, + stream_subscriptions = StreamSubscriptions1, clusters = Clusters1 }, Rest} end; @@ -546,38 +546,38 @@ handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers} = St {State, Rest} end; handle_frame_post_auth(Transport, State, - <<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) -> - case rabbit_stream_manager:create(binary_to_list(Target)) of + <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) -> + case rabbit_stream_manager:create(binary_to_list(Stream)) of {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} -> error_logger:info_msg("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]), - response_ok(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId), + response_ok(Transport, State, ?COMMAND_CREATE_STREAM, CorrelationId), {State, Rest}; {error, reference_already_exists} -> - response(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_ALREADY_EXISTS), + response(Transport, State, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), {State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = S} = State, - <<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) -> - case rabbit_stream_manager:delete(binary_to_list(Target)) of + <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) -> + case rabbit_stream_manager:delete(binary_to_list(Stream)) of {ok, deleted} -> - response_ok(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId), - State1 = case clean_state_after_target_deletion(Target, State) of + response_ok(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId), + State1 = case clean_state_after_stream_deletion(Stream, State) of {cleaned, NewState} -> - TargetSize = byte_size(Target), - FrameSize = 2 + 2 + 2 + 2 + TargetSize, + StreamSize = byte_size(Stream), + FrameSize = 2 + 2 + 2 + 2 + StreamSize, Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, - ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]), + ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]), NewState; {not_cleaned, SameState} -> SameState end, {State1, Rest}; {error, reference_not_found} -> - response(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST), + response(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), {State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = S} = State, - <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, TargetCount:32, BinaryTargets/binary>>, Rest) -> + <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, StreamCount:32, BinaryStreams/binary>>, Rest) -> %% FIXME: rely only on rabbit_networking to discover the listeners Nodes = rabbit_mnesia:cluster_nodes(all), {NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) -> @@ -592,13 +592,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State, <<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>> end, <<BrokersCount:32>>, NodesInfo), - Targets = extract_target_list(BinaryTargets, []), + Streams = extract_stream_list(BinaryStreams, []), - MetadataBin = lists:foldl(fun(Target, Acc) -> - TargetLength = byte_size(Target), - case lookup_cluster(Target, State) of + MetadataBin = lists:foldl(fun(Stream, Acc) -> + StreamLength = byte_size(Stream), + case lookup_cluster(Stream, State) of cluster_not_found -> - <<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16, + <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16, -1:16, 0:32>>; {Cluster, _} -> LeaderNode = node(Cluster), @@ -611,11 +611,11 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State, end, <<>>, maps:values(Replicas)), ReplicasCount = maps:size(Replicas), - <<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_OK:16, + <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_OK:16, LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>> end - end, <<TargetCount:32>>, Targets), + end, <<StreamCount:32>>, Streams), Frame = <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, BrokersBin/binary, MetadataBin/binary>>, FrameSize = byte_size(Frame), Transport:send(S, <<FrameSize:32, Frame/binary>>), @@ -670,41 +670,41 @@ auth_mechanism_to_module(TypeBin, Sock) -> end end. -extract_target_list(<<>>, Targets) -> - Targets; -extract_target_list(<<Length:16, Target:Length/binary, Rest/binary>>, Targets) -> - extract_target_list(Rest, [Target | Targets]). +extract_stream_list(<<>>, Streams) -> + Streams; +extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) -> + extract_stream_list(Rest, [Stream | Streams]). -clean_state_after_target_deletion(Target, #stream_connection{clusters = Clusters, target_subscriptions = TargetSubscriptions, +clean_state_after_stream_deletion(Stream, #stream_connection{clusters = Clusters, stream_subscriptions = StreamSubscriptions, consumers = Consumers} = State) -> - TargetAsList = binary_to_list(Target), - case maps:is_key(TargetAsList, TargetSubscriptions) of + StreamAsList = binary_to_list(Stream), + case maps:is_key(StreamAsList, StreamSubscriptions) of true -> - #{TargetAsList := SubscriptionIds} = TargetSubscriptions, + #{StreamAsList := SubscriptionIds} = StreamSubscriptions, {cleaned, State#stream_connection{ - clusters = maps:remove(Target, Clusters), - target_subscriptions = maps:remove(TargetAsList, TargetSubscriptions), + clusters = maps:remove(Stream, Clusters), + stream_subscriptions = maps:remove(StreamAsList, StreamSubscriptions), consumers = maps:without(SubscriptionIds, Consumers) }}; false -> {not_cleaned, State} end. -lookup_cluster(Target, #stream_connection{clusters = Clusters} = State) -> - case maps:get(Target, Clusters, undefined) of +lookup_cluster(Stream, #stream_connection{clusters = Clusters} = State) -> + case maps:get(Stream, Clusters, undefined) of undefined -> - case lookup_cluster_from_manager(Target) of + case lookup_cluster_from_manager(Stream) of cluster_not_found -> cluster_not_found; ClusterPid -> - {ClusterPid, State#stream_connection{clusters = Clusters#{Target => ClusterPid}}} + {ClusterPid, State#stream_connection{clusters = Clusters#{Stream => ClusterPid}}} end; ClusterPid -> {ClusterPid, State} end. -lookup_cluster_from_manager(Target) -> - rabbit_stream_manager:lookup(Target). +lookup_cluster_from_manager(Stream) -> + rabbit_stream_manager:lookup(Stream). frame(Transport, #stream_connection{socket = S}, Frame) -> FrameSize = byte_size(Frame), @@ -716,8 +716,8 @@ response_ok(Transport, State, CommandId, CorrelationId) -> response(Transport, #stream_connection{socket = S}, CommandId, CorrelationId, ResponseCode) -> Transport:send(S, [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]). -subscription_exists(TargetSubscriptions, SubscriptionId) -> - SubscriptionIds = lists:flatten(maps:values(TargetSubscriptions)), +subscription_exists(StreamSubscriptions, SubscriptionId) -> + SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)), lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) -> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index cd876b0403..4799334816 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -75,15 +75,15 @@ test_server(Port) -> {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), test_authenticate(S), - Target = <<"target1">>, - test_create_target(S, Target), + Stream = <<"stream1">>, + test_create_stream(S, Stream), Body = <<"hello">>, - test_publish_confirm(S, Target, Body), + test_publish_confirm(S, Stream, Body), SubscriptionId = 42, - test_subscribe(S, SubscriptionId, Target), + test_subscribe(S, SubscriptionId, Stream), test_deliver(S, SubscriptionId, Body), - test_delete_target(S, Target), - test_metadata_update_target_deleted(S, Target), + test_delete_stream(S, Stream), + test_metadata_update_stream_deleted(S, Stream), test_close(S), closed = wait_for_socket_close(S, 10), ok. @@ -142,32 +142,32 @@ test_authenticate(S) -> {ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). -test_create_target(S, Target) -> - TargetSize = byte_size(Target), - CreateTargetFrame = <<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>, - FrameSize = byte_size(CreateTargetFrame), - gen_tcp:send(S, <<FrameSize:32, CreateTargetFrame/binary>>), - {ok, <<_Size:32, ?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). +test_create_stream(S, Stream) -> + StreamSize = byte_size(Stream), + CreateStreamFrame = <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>, + FrameSize = byte_size(CreateStreamFrame), + gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>), + {ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). -test_delete_target(S, Target) -> - TargetSize = byte_size(Target), - DeleteTargetFrame = <<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>, - FrameSize = byte_size(DeleteTargetFrame), - gen_tcp:send(S, <<FrameSize:32, DeleteTargetFrame/binary>>), +test_delete_stream(S, Stream) -> + StreamSize = byte_size(Stream), + DeleteStreamFrame = <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>, + FrameSize = byte_size(DeleteStreamFrame), + gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>), ResponseFrameSize = 10, - {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). + {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). -test_publish_confirm(S, Target, Body) -> +test_publish_confirm(S, Stream, Body) -> BodySize = byte_size(Body), - TargetSize = byte_size(Target), - PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, TargetSize:16, Target:TargetSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, + StreamSize = byte_size(Stream), + PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, FrameSize = byte_size(PublishFrame), gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). -test_subscribe(S, SubscriptionId, Target) -> - TargetSize = byte_size(Target), - SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, 0:64, 10:16>>, +test_subscribe(S, SubscriptionId, Stream) -> + StreamSize = byte_size(Stream), + SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:32, StreamSize:16, Stream:StreamSize/binary, 0:64, 10:16>>, FrameSize = byte_size(SubscribeFrame), gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>), Res = gen_tcp:recv(S, 0, 5000), @@ -179,9 +179,9 @@ test_deliver(S, SubscriptionId, Body) -> <<48:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:32, 5:4/unsigned, 0:4/unsigned, 1:16, 1:32, _Epoch:64, 0:64, _Crc:32, _DataLength:32, 0:1, BodySize:31/unsigned, Body/binary>> = Frame. -test_metadata_update_target_deleted(S, Target) -> - TargetSize = byte_size(Target), - {ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>} = gen_tcp:recv(S, 0, 5000). +test_metadata_update_stream_deleted(S, Stream) -> + StreamSize = byte_size(Stream), + {ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000). test_close(S) -> CloseReason = <<"OK">>, |
