diff options
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 3 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 83 |
2 files changed, 86 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index d130c7225b..4e428495b0 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -182,6 +182,8 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) -> last; {_, <<"next">>} -> next; + {timestamp, V} -> + {timestamp, V}; {_, V} -> V end, @@ -219,6 +221,7 @@ begin_stream(#stream_client{readers = Readers0} = State, first -> NextOffset; last -> NextOffset; next -> NextOffset; + {timestamp, _} -> NextOffset; _ -> Offset end, Str0 = #stream{name = amqqueue:get_name(Q), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 401e470eea..a1055458db 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -74,6 +74,8 @@ all_tests() -> consume_without_qos, consume, consume_offset, + consume_timestamp_offset, + consume_timestamp_last_offset, basic_get, consume_with_autoack, consume_and_nack, @@ -599,6 +601,87 @@ consume_offset(Config) -> end) end, [], 25). +consume_timestamp_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + Payload = <<"111">>, + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + Offset = erlang:system_time(millisecond) - 600000, + amqp_channel:subscribe( + Ch1, + #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, timestamp, Offset}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + %% It has subscribed to a very old timestamp, so we will receive the whole stream + receive_batch(Ch1, 0, 99). + +consume_timestamp_last_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"111">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + %% Subscribe from now/future + Offset = erlang:system_time(millisecond) + 60000, + amqp_channel:subscribe( + Ch1, + #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, timestamp, Offset}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + receive + {_, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} + when S < 100 -> + exit({unexpected_offset, S}) + after 1000 -> + ok + end, + + %% Publish a few more + [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5), + + %% Yeah! we got them + receive_batch(Ch1, 100, 199). + basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |