summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-05-06 09:15:16 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-05-06 09:15:16 +0200
commit2d19e859252c7e143c508aa147269e987c2c7f16 (patch)
treee6134adc78f2a41dbca0e416677e9628c37c6acb
parentb61a79b9ffa20fadc8e2ef52403f0e1a6972c7c7 (diff)
downloadrabbitmq-server-git-2d19e859252c7e143c508aa147269e987c2c7f16.tar.gz
Use stream instead of target
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl10
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl8
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl166
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl54
4 files changed, 119 insertions, 119 deletions
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index b6fcb0f6ab..7e46a6f87c 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -13,17 +13,17 @@
-define(COMMAND_OPEN, 12).
-define(COMMAND_CLOSE, 13).
-define(COMMAND_HEARTBEAT, 14).
--define(COMMAND_CREATE_TARGET, 998).
--define(COMMAND_DELETE_TARGET, 999).
+-define(COMMAND_CREATE_STREAM, 998).
+-define(COMMAND_DELETE_STREAM, 999).
-define(VERSION_0, 0).
-define(RESPONSE_CODE_OK, 0).
--define(RESPONSE_CODE_TARGET_DOES_NOT_EXIST, 1).
+-define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 1).
-define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 2).
-define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 3).
--define(RESPONSE_CODE_TARGET_ALREADY_EXISTS, 4).
--define(RESPONSE_CODE_TARGET_DELETED, 5).
+-define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 4).
+-define(RESPONSE_CODE_STREAM_DELETED, 5).
-define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 6).
-define(RESPONSE_AUTHENTICATION_FAILURE, 7).
-define(RESPONSE_SASL_ERROR, 8).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index b6ecb342c7..6c933fa676 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -101,8 +101,8 @@ register() ->
unregister() ->
gen_server:call(?MODULE, {unregister, self()}).
-lookup(Target) ->
- gen_server:call(?MODULE, {lookup, Target}).
+lookup(Stream) ->
+ gen_server:call(?MODULE, {lookup, Stream}).
replicas_for_current_node() ->
rabbit_mnesia:cluster_nodes(all) -- [node()].
@@ -167,8 +167,8 @@ handle_call({unregister, Pid}, _From, #state{listeners = Listeners, monitors = M
maps:remove(Pid, Monitors)
end,
{reply, ok, State#state{listeners = lists:delete(Pid, Listeners), monitors = Monitors1}};
-handle_call({lookup, Target}, _From, State) ->
- Res = case read(Target) of
+handle_call({lookup, Stream}, _From, State) ->
+ Res = case read(Stream) of
[] ->
cluster_not_found;
[#?MODULE{leader_pid = LeaderPid}] ->
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 0d2fd3f85c..014c3c2031 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -7,7 +7,7 @@
name,
helper_sup,
listen_socket, socket, clusters, data, consumers,
- target_subscriptions, credits,
+ stream_subscriptions, credits,
blocked,
authentication_state, user, virtual_host,
connection_step, % tcp_connected, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done
@@ -16,7 +16,7 @@
}).
-record(consumer, {
- socket, leader, offset, subscription_id, segment, credit, target
+ socket, leader, offset, subscription_id, segment, credit, stream
}).
-record(configuration, {
@@ -53,7 +53,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
helper_sup = KeepaliveSup,
socket = RealSocket, data = none,
clusters = #{},
- consumers = #{}, target_subscriptions = #{},
+ consumers = #{}, stream_subscriptions = #{},
blocked = false, credits = Credits,
authentication_state = none, user = none,
connection_step = tcp_connected,
@@ -125,7 +125,7 @@ close(Transport, S) ->
Transport:close(S).
listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Consumers,
- target_subscriptions = TargetSubscriptions, credits = Credits, blocked = Blocked, heartbeater = Heartbeater} = State,
+ stream_subscriptions = StreamSubscriptions, credits = Credits, blocked = Blocked, heartbeater = Heartbeater} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
{OK, Closed, Error} = Transport:messages(),
receive
@@ -160,13 +160,13 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Cons
listen_loop_post_auth(Transport, State2, Configuration)
end;
{stream_manager, cluster_deleted, ClusterReference} ->
- Target = list_to_binary(ClusterReference),
- State1 = case clean_state_after_target_deletion(Target, State) of
+ Stream = list_to_binary(ClusterReference),
+ State1 = case clean_state_after_stream_deletion(Stream, State) of
{cleaned, NewState} ->
- TargetSize = byte_size(Target),
- FrameSize = 2 + 2 + 2 + 2 + TargetSize,
+ StreamSize = byte_size(Stream),
+ FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
- ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]),
+ ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
NewState;
{not_cleaned, SameState} ->
SameState
@@ -194,17 +194,17 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Cons
State
end,
listen_loop_post_auth(Transport, State1, Configuration);
- {osiris_offset, TargetName, -1} ->
- error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [TargetName, -1]),
+ {osiris_offset, StreamName, -1} ->
+ error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]),
listen_loop_post_auth(Transport, State, Configuration);
- {osiris_offset, TargetName, Offset} when Offset > -1 ->
- State1 = case maps:get(TargetName, TargetSubscriptions, undefined) of
+ {osiris_offset, StreamName, Offset} when Offset > -1 ->
+ State1 = case maps:get(StreamName, StreamSubscriptions, undefined) of
undefined ->
- error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [TargetName]),
+ error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [StreamName]),
State;
[] ->
- error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [TargetName]),
- State#stream_connection{target_subscriptions = maps:remove(TargetName, TargetSubscriptions)};
+ error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [StreamName]),
+ State#stream_connection{stream_subscriptions = maps:remove(StreamName, StreamSubscriptions)};
CorrelationIds when is_list(CorrelationIds) ->
Consumers1 = lists:foldl(fun(CorrelationId, ConsumersAcc) ->
#{CorrelationId := Consumer} = ConsumersAcc,
@@ -311,7 +311,7 @@ generate_publishing_error_details(Acc, <<>>) ->
Acc;
generate_publishing_error_details(Acc, <<PublishingId:64, MessageSize:32, _Message:MessageSize/binary, Rest/binary>>) ->
generate_publishing_error_details(
- <<Acc/binary, PublishingId:64, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16>>,
+ <<Acc/binary, PublishingId:64, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16>>,
Rest).
handle_frame_pre_auth(Transport, #stream_connection{socket = S} = State,
@@ -441,9 +441,9 @@ handle_frame_pre_auth(_Transport, State, Frame, Rest) ->
handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits} = State,
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
- TargetSize:16, Target:TargetSize/binary,
+ StreamSize:16, Stream:StreamSize/binary,
MessageCount:32, Messages/binary>>, Rest) ->
- case lookup_cluster(Target, State) of
+ case lookup_cluster(Stream, State) of
cluster_not_found ->
FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount,
Details = generate_publishing_error_details(<<>>, Messages),
@@ -455,16 +455,16 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
sub_credits(Credits, MessageCount),
{State1, Rest}
end;
-handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers = Consumers, target_subscriptions = TargetSubscriptions} = State,
- <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, Offset:64/unsigned, Credit:16/signed>>, Rest) ->
- case lookup_cluster(Target, State) of
+handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers = Consumers, stream_subscriptions = StreamSubscriptions} = State,
+ <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, StreamSize:16, Stream:StreamSize/binary, Offset:64/unsigned, Credit:16/signed>>, Rest) ->
+ case lookup_cluster(Stream, State) of
cluster_not_found ->
- response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST),
+ response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
{State, Rest};
{ClusterLeader, State1} ->
- % offset message uses a list for the target, so storing this in the state for easier retrieval
- TargetKey = binary_to_list(Target),
- case subscription_exists(TargetSubscriptions, SubscriptionId) of
+ % offset message uses a list for the stream, so storing this in the state for easier retrieval
+ StreamKey = binary_to_list(Stream),
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
true ->
response(Transport, State1, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
{State1, Rest};
@@ -475,7 +475,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers
leader = ClusterLeader, offset = Offset, subscription_id = SubscriptionId, socket = Socket,
segment = Segment,
credit = Credit,
- target = TargetKey
+ stream = StreamKey
},
error_logger:info_msg("registering consumer ~p in ~p~n", [ConsumerState, self()]),
@@ -487,41 +487,41 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers
),
Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}},
- TargetSubscriptions1 =
- case TargetSubscriptions of
- #{TargetKey := SubscriptionIds} ->
- TargetSubscriptions#{TargetKey => [SubscriptionId] ++ SubscriptionIds};
+ StreamSubscriptions1 =
+ case StreamSubscriptions of
+ #{StreamKey := SubscriptionIds} ->
+ StreamSubscriptions#{StreamKey => [SubscriptionId] ++ SubscriptionIds};
_ ->
- TargetSubscriptions#{TargetKey => [SubscriptionId]}
+ StreamSubscriptions#{StreamKey => [SubscriptionId]}
end,
- {State1#stream_connection{consumers = Consumers1, target_subscriptions = TargetSubscriptions1}, Rest}
+ {State1#stream_connection{consumers = Consumers1, stream_subscriptions = StreamSubscriptions1}, Rest}
end
end;
-handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers, target_subscriptions = TargetSubscriptions, clusters = Clusters} = State,
+handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers, stream_subscriptions = StreamSubscriptions, clusters = Clusters} = State,
<<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32>>, Rest) ->
- case subscription_exists(TargetSubscriptions, SubscriptionId) of
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
false ->
response(Transport, State, ?COMMAND_UNSUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
{State, Rest};
true ->
#{SubscriptionId := Consumer} = Consumers,
- Target = Consumer#consumer.target,
- #{Target := SubscriptionsForThisTarget} = TargetSubscriptions,
- SubscriptionsForThisTarget1 = lists:delete(SubscriptionId, SubscriptionsForThisTarget),
- {TargetSubscriptions1, Clusters1} =
- case length(SubscriptionsForThisTarget1) of
+ Stream = Consumer#consumer.stream,
+ #{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
+ SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
+ {StreamSubscriptions1, Clusters1} =
+ case length(SubscriptionsForThisStream1) of
0 ->
- %% no more subscriptions for this target
- {maps:remove(Target, TargetSubscriptions),
- maps:remove(list_to_binary(Target), Clusters)
+ %% no more subscriptions for this stream
+ {maps:remove(Stream, StreamSubscriptions),
+ maps:remove(list_to_binary(Stream), Clusters)
};
_ ->
- {TargetSubscriptions#{Target => SubscriptionsForThisTarget1}, Clusters}
+ {StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, Clusters}
end,
Consumers1 = maps:remove(SubscriptionId, Consumers),
response_ok(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId),
{State#stream_connection{consumers = Consumers1,
- target_subscriptions = TargetSubscriptions1,
+ stream_subscriptions = StreamSubscriptions1,
clusters = Clusters1
}, Rest}
end;
@@ -546,38 +546,38 @@ handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers} = St
{State, Rest}
end;
handle_frame_post_auth(Transport, State,
- <<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) ->
- case rabbit_stream_manager:create(binary_to_list(Target)) of
+ <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ case rabbit_stream_manager:create(binary_to_list(Stream)) of
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
error_logger:info_msg("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
- response_ok(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId),
+ response_ok(Transport, State, ?COMMAND_CREATE_STREAM, CorrelationId),
{State, Rest};
{error, reference_already_exists} ->
- response(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_ALREADY_EXISTS),
+ response(Transport, State, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
{State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
- <<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) ->
- case rabbit_stream_manager:delete(binary_to_list(Target)) of
+ <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ case rabbit_stream_manager:delete(binary_to_list(Stream)) of
{ok, deleted} ->
- response_ok(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId),
- State1 = case clean_state_after_target_deletion(Target, State) of
+ response_ok(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId),
+ State1 = case clean_state_after_stream_deletion(Stream, State) of
{cleaned, NewState} ->
- TargetSize = byte_size(Target),
- FrameSize = 2 + 2 + 2 + 2 + TargetSize,
+ StreamSize = byte_size(Stream),
+ FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
- ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]),
+ ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
NewState;
{not_cleaned, SameState} ->
SameState
end,
{State1, Rest};
{error, reference_not_found} ->
- response(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST),
+ response(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
{State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
- <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, TargetCount:32, BinaryTargets/binary>>, Rest) ->
+ <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, StreamCount:32, BinaryStreams/binary>>, Rest) ->
%% FIXME: rely only on rabbit_networking to discover the listeners
Nodes = rabbit_mnesia:cluster_nodes(all),
{NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) ->
@@ -592,13 +592,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
<<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
end, <<BrokersCount:32>>, NodesInfo),
- Targets = extract_target_list(BinaryTargets, []),
+ Streams = extract_stream_list(BinaryStreams, []),
- MetadataBin = lists:foldl(fun(Target, Acc) ->
- TargetLength = byte_size(Target),
- case lookup_cluster(Target, State) of
+ MetadataBin = lists:foldl(fun(Stream, Acc) ->
+ StreamLength = byte_size(Stream),
+ case lookup_cluster(Stream, State) of
cluster_not_found ->
- <<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16,
+ <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
-1:16, 0:32>>;
{Cluster, _} ->
LeaderNode = node(Cluster),
@@ -611,11 +611,11 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
end, <<>>, maps:values(Replicas)),
ReplicasCount = maps:size(Replicas),
- <<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_OK:16,
+ <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_OK:16,
LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>>
end
- end, <<TargetCount:32>>, Targets),
+ end, <<StreamCount:32>>, Streams),
Frame = <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, BrokersBin/binary, MetadataBin/binary>>,
FrameSize = byte_size(Frame),
Transport:send(S, <<FrameSize:32, Frame/binary>>),
@@ -670,41 +670,41 @@ auth_mechanism_to_module(TypeBin, Sock) ->
end
end.
-extract_target_list(<<>>, Targets) ->
- Targets;
-extract_target_list(<<Length:16, Target:Length/binary, Rest/binary>>, Targets) ->
- extract_target_list(Rest, [Target | Targets]).
+extract_stream_list(<<>>, Streams) ->
+ Streams;
+extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
+ extract_stream_list(Rest, [Stream | Streams]).
-clean_state_after_target_deletion(Target, #stream_connection{clusters = Clusters, target_subscriptions = TargetSubscriptions,
+clean_state_after_stream_deletion(Stream, #stream_connection{clusters = Clusters, stream_subscriptions = StreamSubscriptions,
consumers = Consumers} = State) ->
- TargetAsList = binary_to_list(Target),
- case maps:is_key(TargetAsList, TargetSubscriptions) of
+ StreamAsList = binary_to_list(Stream),
+ case maps:is_key(StreamAsList, StreamSubscriptions) of
true ->
- #{TargetAsList := SubscriptionIds} = TargetSubscriptions,
+ #{StreamAsList := SubscriptionIds} = StreamSubscriptions,
{cleaned, State#stream_connection{
- clusters = maps:remove(Target, Clusters),
- target_subscriptions = maps:remove(TargetAsList, TargetSubscriptions),
+ clusters = maps:remove(Stream, Clusters),
+ stream_subscriptions = maps:remove(StreamAsList, StreamSubscriptions),
consumers = maps:without(SubscriptionIds, Consumers)
}};
false ->
{not_cleaned, State}
end.
-lookup_cluster(Target, #stream_connection{clusters = Clusters} = State) ->
- case maps:get(Target, Clusters, undefined) of
+lookup_cluster(Stream, #stream_connection{clusters = Clusters} = State) ->
+ case maps:get(Stream, Clusters, undefined) of
undefined ->
- case lookup_cluster_from_manager(Target) of
+ case lookup_cluster_from_manager(Stream) of
cluster_not_found ->
cluster_not_found;
ClusterPid ->
- {ClusterPid, State#stream_connection{clusters = Clusters#{Target => ClusterPid}}}
+ {ClusterPid, State#stream_connection{clusters = Clusters#{Stream => ClusterPid}}}
end;
ClusterPid ->
{ClusterPid, State}
end.
-lookup_cluster_from_manager(Target) ->
- rabbit_stream_manager:lookup(Target).
+lookup_cluster_from_manager(Stream) ->
+ rabbit_stream_manager:lookup(Stream).
frame(Transport, #stream_connection{socket = S}, Frame) ->
FrameSize = byte_size(Frame),
@@ -716,8 +716,8 @@ response_ok(Transport, State, CommandId, CorrelationId) ->
response(Transport, #stream_connection{socket = S}, CommandId, CorrelationId, ResponseCode) ->
Transport:send(S, [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]).
-subscription_exists(TargetSubscriptions, SubscriptionId) ->
- SubscriptionIds = lists:flatten(maps:values(TargetSubscriptions)),
+subscription_exists(StreamSubscriptions, SubscriptionId) ->
+ SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)),
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) ->
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index cd876b0403..4799334816 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -75,15 +75,15 @@ test_server(Port) ->
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
{mode, binary}]),
test_authenticate(S),
- Target = <<"target1">>,
- test_create_target(S, Target),
+ Stream = <<"stream1">>,
+ test_create_stream(S, Stream),
Body = <<"hello">>,
- test_publish_confirm(S, Target, Body),
+ test_publish_confirm(S, Stream, Body),
SubscriptionId = 42,
- test_subscribe(S, SubscriptionId, Target),
+ test_subscribe(S, SubscriptionId, Stream),
test_deliver(S, SubscriptionId, Body),
- test_delete_target(S, Target),
- test_metadata_update_target_deleted(S, Target),
+ test_delete_stream(S, Stream),
+ test_metadata_update_stream_deleted(S, Stream),
test_close(S),
closed = wait_for_socket_close(S, 10),
ok.
@@ -142,32 +142,32 @@ test_authenticate(S) ->
{ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
-test_create_target(S, Target) ->
- TargetSize = byte_size(Target),
- CreateTargetFrame = <<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>,
- FrameSize = byte_size(CreateTargetFrame),
- gen_tcp:send(S, <<FrameSize:32, CreateTargetFrame/binary>>),
- {ok, <<_Size:32, ?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
+test_create_stream(S, Stream) ->
+ StreamSize = byte_size(Stream),
+ CreateStreamFrame = <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>,
+ FrameSize = byte_size(CreateStreamFrame),
+ gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
+ {ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
-test_delete_target(S, Target) ->
- TargetSize = byte_size(Target),
- DeleteTargetFrame = <<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>,
- FrameSize = byte_size(DeleteTargetFrame),
- gen_tcp:send(S, <<FrameSize:32, DeleteTargetFrame/binary>>),
+test_delete_stream(S, Stream) ->
+ StreamSize = byte_size(Stream),
+ DeleteStreamFrame = <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>,
+ FrameSize = byte_size(DeleteStreamFrame),
+ gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>),
ResponseFrameSize = 10,
- {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
+ {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
-test_publish_confirm(S, Target, Body) ->
+test_publish_confirm(S, Stream, Body) ->
BodySize = byte_size(Body),
- TargetSize = byte_size(Target),
- PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, TargetSize:16, Target:TargetSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
+ StreamSize = byte_size(Stream),
+ PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
{ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
-test_subscribe(S, SubscriptionId, Target) ->
- TargetSize = byte_size(Target),
- SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, 0:64, 10:16>>,
+test_subscribe(S, SubscriptionId, Stream) ->
+ StreamSize = byte_size(Stream),
+ SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:32, StreamSize:16, Stream:StreamSize/binary, 0:64, 10:16>>,
FrameSize = byte_size(SubscribeFrame),
gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>),
Res = gen_tcp:recv(S, 0, 5000),
@@ -179,9 +179,9 @@ test_deliver(S, SubscriptionId, Body) ->
<<48:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:32, 5:4/unsigned, 0:4/unsigned, 1:16, 1:32, _Epoch:64, 0:64, _Crc:32, _DataLength:32,
0:1, BodySize:31/unsigned, Body/binary>> = Frame.
-test_metadata_update_target_deleted(S, Target) ->
- TargetSize = byte_size(Target),
- {ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>} = gen_tcp:recv(S, 0, 5000).
+test_metadata_update_stream_deleted(S, Stream) ->
+ StreamSize = byte_size(Stream),
+ {ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000).
test_close(S) ->
CloseReason = <<"OK">>,