diff options
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 14 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 32 |
2 files changed, 44 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index cc6700b42f..65f21a101c 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -158,7 +158,19 @@ policy_changed(Q) -> ok. stat(Q) -> - {ok, i(messages, Q), 0}. + QName = amqqueue:get_name(Q), + Conf = amqqueue:get_type_state(Q), + case maps:get(leader_node, Conf) of + Node when Node =/= node() -> + case rpc:call(Node, ?MODULE, info, [Q, [messages]]) of + {badrpc, _} -> + {ok, 0, 0}; + [{messages, Messages}] -> + {ok, Messages, 0} + end; + _ -> + {ok, i(messages, Q), 0} + end. consume(Q, #{prefetch_count := 0}, _) when ?amqqueue_is_stream(Q) -> diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 6896d76f58..12aadff011 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(nowarn_export_all). -compile(export_all). @@ -48,7 +49,8 @@ groups() -> leader_failover_dedupe, add_replicas, publish_coordinator_unavailable, - leader_locator_policy]}, + leader_locator_policy, + queue_size_on_declare]}, {cluster_size_3_1, [], [shrink_coordinator_cluster]}, {cluster_size_3_2, [], [recover, declare_with_node_down]}, @@ -1799,6 +1801,34 @@ leader_locator_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +queue_size_on_declare(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]), + + %% Metrics update is not synchronous, wait until metrics are updated on the leader node. + %% Afterwards, all replicas will get the right size as they have to query the writer node + ?awaitMatch({'queue.declare_ok', Q, 100, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]), + 60000), + amqp_channel:close(Ch1), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + ?assertEqual({'queue.declare_ok', Q, 100, 0}, + declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + amqp_channel:close(Ch2), + + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3), + ?assertEqual({'queue.declare_ok', Q, 100, 0}, + declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + amqp_channel:close(Ch3), + + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + repeat_until(_, 0) -> ct:fail("Condition did not materialize in the expected amount of attempts"); repeat_until(Fun, N) -> |