diff options
author | Marcial Rosales <mrosales@pivotal.io> | 2022-09-27 17:36:17 +0200 |
---|---|---|
committer | Marcial Rosales <mrosales@pivotal.io> | 2022-09-27 17:36:17 +0200 |
commit | 3052691d88ab297d05dcf6639c0b224eb614cab4 (patch) | |
tree | e4f37e5a4b5f288f1e29047f97a17449b33fdab8 | |
parent | c4bc43b01603c2bd2a173c6dac8c8acf56ec1db9 (diff) | |
download | rabbitmq-server-git-add-stomp-headers-for-streams.tar.gz |
Add 2 additional stream queue argumentsadd-stomp-headers-for-streams
x-max-age, x-stream-max-segment-size-bytes
-rw-r--r-- | deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl | 6 | ||||
-rw-r--r-- | deps/rabbitmq_stomp/src/rabbit_stomp_util.erl | 8 | ||||
-rw-r--r-- | deps/rabbitmq_stomp/test/util_SUITE.erl | 26 |
3 files changed, 37 insertions, 3 deletions
diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl index 4b41b71def..7e6a87d925 100644 --- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl @@ -48,6 +48,8 @@ -define(HEADER_X_MESSAGE_TTL, "x-message-ttl"). -define(HEADER_X_QUEUE_NAME, "x-queue-name"). -define(HEADER_X_QUEUE_TYPE, "x-queue-type"). +-define(HEADER_X_MAX_AGE, "x-max-age"). +-define(HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, "x-stream-max-segment-size-bytes"). -define(MESSAGE_ID_SEPARATOR, "@@"). @@ -63,7 +65,9 @@ ?HEADER_X_MAX_LENGTH_BYTES, ?HEADER_X_MAX_PRIORITY, ?HEADER_X_MESSAGE_TTL, - ?HEADER_X_QUEUE_TYPE + ?HEADER_X_QUEUE_TYPE, + ?HEADER_X_MAX_AGE, + ?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES ]). -define(HEADER_PARAMS, [ diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index 4f3b4f612c..8a450c4596 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -291,7 +291,13 @@ build_argument(?HEADER_X_MESSAGE_TTL, Val) -> list_to_integer(string:strip(Val))}; build_argument(?HEADER_X_QUEUE_TYPE, Val) -> {list_to_binary(?HEADER_X_QUEUE_TYPE), longstr, - list_to_binary(string:strip(Val))}. + list_to_binary(string:strip(Val))}; +build_argument(?HEADER_X_MAX_AGE, Val) -> + {list_to_binary(?HEADER_X_MAX_AGE), longstr, + list_to_binary(string:strip(Val))}; +build_argument(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, Val) -> + {list_to_binary(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES), long, + list_to_integer(string:strip(Val))}. build_params(EndPoint, Headers) -> Params = lists:foldl(fun({K, V}, Acc) -> diff --git a/deps/rabbitmq_stomp/test/util_SUITE.erl b/deps/rabbitmq_stomp/test/util_SUITE.erl index 0b04d9183c..e6e55c827c 100644 --- a/deps/rabbitmq_stomp/test/util_SUITE.erl +++ b/deps/rabbitmq_stomp/test/util_SUITE.erl @@ -12,6 +12,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/rabbit_routing_prefixes.hrl"). -include("rabbit_stomp_frame.hrl"). +-include("rabbit_stomp_headers.hrl"). -compile(export_all). all() -> [ @@ -39,7 +40,11 @@ all() -> [ consumer_tag_destination, consumer_tag_invalid, parse_valid_message_id, - parse_invalid_message_id + parse_invalid_message_id, + build_arguments_with_x_queue_type_header, + build_arguments_with_x_max_length_bytes_header, + build_arguments_with_x_max_age_header, + build_arguments_with_x_stream_max_segment_size_bytes_header ]. @@ -240,3 +245,22 @@ parse_invalid_message_id(_) -> {error, invalid_message_id} = rabbit_stomp_util:parse_message_id("blah"). +%%--- +%% Stream-related headers mapped into queue declaration arguments +%%--- + +build_arguments_with_x_queue_type_header(_) -> + Headers = [{"x-queue-type", "stream"}], + { arguments, [ {<<?HEADER_X_QUEUE_TYPE>>, longstr, <<"stream">>} ]} = rabbit_stomp_util:build_arguments(Headers). + +build_arguments_with_x_max_length_bytes_header(_) -> + Headers = [{"x-max-length-bytes", "100"}], + { arguments, [ {<<?HEADER_X_MAX_LENGTH_BYTES>>, long, 100} ]} = rabbit_stomp_util:build_arguments(Headers). + +build_arguments_with_x_max_age_header(_) -> + Headers = [{"x-max-age", "2D"}], + { arguments, [ {<<?HEADER_X_MAX_AGE>>, longstr, <<"2D">>} ]} = rabbit_stomp_util:build_arguments(Headers). + +build_arguments_with_x_stream_max_segment_size_bytes_header(_) -> + Headers = [{"x-stream-max-segment-size-bytes", "20000"}], + { arguments, [ {<<?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES>>, long, 20000} ]} = rabbit_stomp_util:build_arguments(Headers). |