diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-19 10:58:14 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-19 10:58:14 +0200 |
commit | db5d3f8fa03ea6ed5c40c3d69f415cd875b7a89e (patch) | |
tree | cff0c2b015c70fea81e310a1968bbb61552b4f49 | |
parent | e8b343b3e6ef5e95532117cbad97f21931e328c8 (diff) | |
download | rabbitmq-server-git-stream-queue-leader-locator-with-hash.tar.gz |
Use stream ID hash to pick "random" leaderstream-queue-leader-locator-with-hash
-rw-r--r-- | src/rabbit_stream_coordinator.erl | 5 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 12 |
2 files changed, 7 insertions, 10 deletions
diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl index 7b48d778d0..8a1205b9af 100644 --- a/src/rabbit_stream_coordinator.erl +++ b/src/rabbit_stream_coordinator.erl @@ -13,7 +13,6 @@ %% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved. %% -module(rabbit_stream_coordinator). --include("rabbit.hrl"). -behaviour(ra_machine). @@ -912,10 +911,10 @@ apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = apply_leader_locator_strategy(#{leader_node := Leader, replica_nodes := Replicas0, leader_locator_strategy := <<"random">>, - reference := #resource{name = Name}} = Conf, _) -> + name := StreamId} = Conf, _) -> Replicas = [Leader | Replicas0], ClusterSize = length(Replicas), - Hash = erlang:phash2(Name), + Hash = erlang:phash2(StreamId), Pos = (Hash rem ClusterSize) + 1, NewLeader = lists:nth(Pos, Replicas), NewReplicas = lists:delete(NewLeader, Replicas), diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index cb7de46268..d6275eae17 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -1267,23 +1267,21 @@ leader_locator_random(Config) -> repeat_until( fun() -> - QName = base64:encode(crypto:strong_rand_bytes(20)), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - ?assertEqual({'queue.declare_ok', QName, 0, 0}, - declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"stream">>}, + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), [Info2] = lists:filter( fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, QName)}, Props) + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) end, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all, [<<"/">>, [name, leader]])), Leader2 = proplists:get_value(leader, Info2), - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = QName})), - Leader =/= Leader2 end, 10). |