summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_stream_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_stream_queue.erl')
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl734
1 files changed, 734 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
new file mode 100644
index 0000000000..4e428495b0
--- /dev/null
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -0,0 +1,734 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_queue).
+
+-behaviour(rabbit_queue_type).
+
+-export([is_enabled/0,
+ declare/2,
+ delete/4,
+ purge/1,
+ policy_changed/1,
+ recover/2,
+ is_recoverable/1,
+ consume/3,
+ cancel/5,
+ handle_event/2,
+ deliver/2,
+ settle/4,
+ credit/4,
+ dequeue/4,
+ info/2,
+ init/1,
+ close/1,
+ update/2,
+ state_info/1,
+ stat/1,
+ capabilities/0]).
+
+-export([set_retention_policy/3]).
+-export([add_replica/3,
+ delete_replica/3]).
+-export([format_osiris_event/2]).
+-export([update_stream_conf/1]).
+
+-include("rabbit.hrl").
+-include("amqqueue.hrl").
+
+-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
+ messages, messages_ready, messages_unacknowledged, committed_offset,
+ policy, operator_policy, effective_policy_definition, type]).
+
+-type appender_seq() :: non_neg_integer().
+
+-record(stream, {name :: rabbit_types:r('queue'),
+ credit :: integer(),
+ max :: non_neg_integer(),
+ start_offset = 0 :: non_neg_integer(),
+ listening_offset = 0 :: non_neg_integer(),
+ log :: undefined | osiris_log:state()}).
+
+-record(stream_client, {name :: term(),
+ leader :: pid(),
+ next_seq = 1 :: non_neg_integer(),
+ correlation = #{} :: #{appender_seq() => term()},
+ soft_limit :: non_neg_integer(),
+ slow = false :: boolean(),
+ readers = #{} :: #{term() => #stream{}}
+ }).
+
+-import(rabbit_queue_type_util, [args_policy_lookup/3]).
+
+-type client() :: #stream_client{}.
+
+-spec is_enabled() -> boolean().
+is_enabled() ->
+ rabbit_feature_flags:is_enabled(stream_queue).
+
+-spec declare(amqqueue:amqqueue(), node()) ->
+ {'new' | 'existing', amqqueue:amqqueue()} |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+declare(Q0, Node) when ?amqqueue_is_stream(Q0) ->
+ case rabbit_queue_type_util:run_checks(
+ [fun rabbit_queue_type_util:check_auto_delete/1,
+ fun rabbit_queue_type_util:check_exclusive/1,
+ fun rabbit_queue_type_util:check_non_durable/1],
+ Q0) of
+ ok ->
+ start_cluster(Q0, Node);
+ Err ->
+ Err
+ end.
+
+start_cluster(Q0, Node) ->
+ Arguments = amqqueue:get_arguments(Q0),
+ QName = amqqueue:get_name(Q0),
+ Opts = amqqueue:get_options(Q0),
+ ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
+ Conf0 = make_stream_conf(Node, Q0),
+ case rabbit_stream_coordinator:start_cluster(
+ amqqueue:set_type_state(Q0, Conf0)) of
+ {ok, {error, already_started}, _} ->
+ {protocol_error, precondition_failed, "safe queue name already in use '~s'",
+ [Node]};
+ {ok, {created, Q}, _} ->
+ rabbit_event:notify(queue_created,
+ [{name, QName},
+ {durable, true},
+ {auto_delete, false},
+ {arguments, Arguments},
+ {user_who_performed_action,
+ ActingUser}]),
+ {new, Q};
+ {ok, {error, Error}, _} ->
+ _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
+ {protocol_error, internal_error, "Cannot declare a queue '~s' on node '~s': ~255p",
+ [rabbit_misc:rs(QName), node(), Error]};
+ {ok, {existing, Q}, _} ->
+ {existing, Q};
+ {error, coordinator_unavailable} ->
+ _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
+ {protocol_error, internal_error,
+ "Cannot declare a queue '~s' on node '~s': coordinator unavailable",
+ [rabbit_misc:rs(QName), node()]}
+ end.
+
+-spec delete(amqqueue:amqqueue(), boolean(),
+ boolean(), rabbit_types:username()) ->
+ rabbit_types:ok(non_neg_integer()) |
+ rabbit_types:error(in_use | not_empty).
+delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
+ Name = maps:get(name, amqqueue:get_type_state(Q)),
+ {ok, Reply, _} = rabbit_stream_coordinator:delete_cluster(Name, ActingUser),
+ Reply.
+
+-spec purge(amqqueue:amqqueue()) ->
+ {ok, non_neg_integer()} | {error, term()}.
+purge(_) ->
+ {error, not_supported}.
+
+-spec policy_changed(amqqueue:amqqueue()) -> 'ok'.
+policy_changed(Q) ->
+ Name = maps:get(name, amqqueue:get_type_state(Q)),
+ _ = rabbit_stream_coordinator:policy_changed(Name),
+ ok.
+
+stat(_) ->
+ {ok, 0, 0}.
+
+consume(Q, #{prefetch_count := 0}, _)
+ when ?amqqueue_is_stream(Q) ->
+ {protocol_error, precondition_failed, "consumer prefetch count is not set for '~s'",
+ [rabbit_misc:rs(amqqueue:get_name(Q))]};
+consume(Q, #{no_ack := true}, _)
+ when ?amqqueue_is_stream(Q) ->
+ {protocol_error, not_implemented,
+ "automatic acknowledgement not supported by stream queues ~s",
+ [rabbit_misc:rs(amqqueue:get_name(Q))]};
+consume(Q, #{limiter_active := true}, _State)
+ when ?amqqueue_is_stream(Q) ->
+ {error, global_qos_not_supported_for_queue_type};
+consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
+ %% Messages should include the offset as a custom header.
+ case check_queue_exists_in_local_node(Q) of
+ ok ->
+ #{no_ack := NoAck,
+ channel_pid := ChPid,
+ prefetch_count := ConsumerPrefetchCount,
+ consumer_tag := ConsumerTag,
+ exclusive_consume := ExclusiveConsume,
+ args := Args,
+ ok_msg := OkMsg} = Spec,
+ QName = amqqueue:get_name(Q),
+ Offset = case rabbit_misc:table_lookup(Args, <<"x-stream-offset">>) of
+ undefined ->
+ next;
+ {_, <<"first">>} ->
+ first;
+ {_, <<"last">>} ->
+ last;
+ {_, <<"next">>} ->
+ next;
+ {timestamp, V} ->
+ {timestamp, V};
+ {_, V} ->
+ V
+ end,
+ rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, QName,
+ ConsumerPrefetchCount, false,
+ up, Args),
+ %% FIXME: reply needs to be sent before the stream begins sending
+ %% really it should be sent by the stream queue process like classic queues
+ %% do
+ maybe_send_reply(ChPid, OkMsg),
+ QState = begin_stream(QState0, Q, ConsumerTag, Offset,
+ ConsumerPrefetchCount),
+ {ok, QState, []};
+ Err ->
+ Err
+ end.
+
+get_local_pid(#{leader_pid := Pid}) when node(Pid) == node() ->
+ Pid;
+get_local_pid(#{replica_pids := ReplicaPids}) ->
+ [Local | _] = lists:filter(fun(Pid) ->
+ node(Pid) == node()
+ end, ReplicaPids),
+ Local.
+
+begin_stream(#stream_client{readers = Readers0} = State,
+ Q, Tag, Offset, Max) ->
+ LocalPid = get_local_pid(amqqueue:get_type_state(Q)),
+ {ok, Seg0} = osiris:init_reader(LocalPid, Offset),
+ NextOffset = osiris_log:next_offset(Seg0) - 1,
+ osiris:register_offset_listener(LocalPid, NextOffset),
+ %% TODO: avoid double calls to the same process
+ StartOffset = case Offset of
+ first -> NextOffset;
+ last -> NextOffset;
+ next -> NextOffset;
+ {timestamp, _} -> NextOffset;
+ _ -> Offset
+ end,
+ Str0 = #stream{name = amqqueue:get_name(Q),
+ credit = Max,
+ start_offset = StartOffset,
+ listening_offset = NextOffset,
+ log = Seg0,
+ max = Max},
+ State#stream_client{readers = Readers0#{Tag => Str0}}.
+
+cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
+ name = QName} = State) ->
+ Readers = maps:remove(ConsumerTag, Readers0),
+ rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName),
+ rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag},
+ {channel, self()},
+ {queue, QName},
+ {user_who_performed_action, ActingUser}]),
+ maybe_send_reply(self(), OkMsg),
+ {ok, State#stream_client{readers = Readers}}.
+
+credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
+ name = Name,
+ leader = Leader} = State) ->
+ {Readers1, Msgs} = case Readers0 of
+ #{CTag := #stream{credit = Credit0} = Str0} ->
+ Str1 = Str0#stream{credit = Credit0 + Credit},
+ {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Readers0#{CTag => Str}, Msgs0};
+ _ ->
+ {Readers0, []}
+ end,
+ {Readers, Actions} =
+ case Drain of
+ true ->
+ case Readers1 of
+ #{CTag := #stream{credit = Credit1} = Str2} ->
+ {Readers0#{CTag => Str2#stream{credit = 0}}, [{send_drained, {CTag, Credit1}}]};
+ _ ->
+ {Readers1, []}
+ end;
+ false ->
+ {Readers1, []}
+ end,
+ {State#stream_client{readers = Readers}, [{send_credit_reply, length(Msgs)},
+ {deliver, CTag, true, Msgs}] ++ Actions}.
+
+deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
+ lists:foldl(
+ fun({_Q, stateless}, {Qs, Actions}) ->
+ %% TODO what do we do with stateless?
+ %% QRef = amqqueue:get_pid(Q),
+ %% ok = rabbit_fifo_client:untracked_enqueue(
+ %% [QRef], Delivery#delivery.message),
+ {Qs, Actions};
+ ({Q, S0}, {Qs, Actions}) ->
+ S = deliver(Confirm, Delivery, S0),
+ {[{Q, S} | Qs], Actions}
+ end, {[], []}, QSs).
+
+deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
+ #stream_client{name = Name,
+ leader = LeaderPid,
+ next_seq = Seq,
+ correlation = Correlation0,
+ soft_limit = SftLmt,
+ slow = Slow0} = State) ->
+ ok = osiris:write(LeaderPid, Seq, msg_to_iodata(Msg)),
+ Correlation = case MsgId of
+ undefined ->
+ Correlation0;
+ _ when is_number(MsgId) ->
+ Correlation0#{Seq => MsgId}
+ end,
+ Slow = case maps:size(Correlation) >= SftLmt of
+ true when not Slow0 ->
+ credit_flow:block(Name),
+ true;
+ Bool ->
+ Bool
+ end,
+ State#stream_client{next_seq = Seq + 1,
+ correlation = Correlation,
+ slow = Slow}.
+-spec dequeue(_, _, _, client()) -> no_return().
+dequeue(_, _, _, #stream_client{name = Name}) ->
+ {protocol_error, not_implemented, "basic.get not supported by stream queues ~s",
+ [rabbit_misc:rs(Name)]}.
+
+handle_event({osiris_written, From, Corrs}, State = #stream_client{correlation = Correlation0,
+ soft_limit = SftLmt,
+ slow = Slow0,
+ name = Name}) ->
+ MsgIds = maps:values(maps:with(Corrs, Correlation0)),
+ Correlation = maps:without(Corrs, Correlation0),
+ Slow = case maps:size(Correlation) < SftLmt of
+ true when Slow0 ->
+ credit_flow:unblock(Name),
+ false;
+ _ ->
+ Slow0
+ end,
+ {ok, State#stream_client{correlation = Correlation,
+ slow = Slow}, [{settled, From, MsgIds}]};
+handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Leader,
+ readers = Readers0,
+ name = Name}) ->
+ %% offset isn't actually needed as we use the atomic to read the
+ %% current committed
+ {Readers, TagMsgs} = maps:fold(
+ fun (Tag, Str0, {Acc, TM}) ->
+ {Str, Msgs} = stream_entries(Name, Leader, Str0),
+ %% HACK for now, better to just return but
+ %% tricky with acks credits
+ %% that also evaluate the stream
+ % gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
+ {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
+ end, {#{}, []}, Readers0),
+ Ack = true,
+ Deliveries = [{deliver, Tag, Ack, OffsetMsg}
+ || {Tag, _LeaderPid, OffsetMsg} <- TagMsgs],
+ {ok, State#stream_client{readers = Readers}, Deliveries}.
+
+is_recoverable(Q) ->
+ Node = node(),
+ #{replica_nodes := Nodes,
+ leader_node := Leader} = amqqueue:get_type_state(Q),
+ lists:member(Node, Nodes ++ [Leader]).
+
+recover(_VHost, Queues) ->
+ lists:foldl(
+ fun (Q0, {R0, F0}) ->
+ {ok, Q} = recover(Q0),
+ {[Q | R0], F0}
+ end, {[], []}, Queues).
+
+settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
+ name = Name,
+ leader = Leader} = State) ->
+ Credit = length(MsgIds),
+ {Readers, Msgs} = case Readers0 of
+ #{CTag := #stream{credit = Credit0} = Str0} ->
+ Str1 = Str0#stream{credit = Credit0 + Credit},
+ {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Readers0#{CTag => Str}, Msgs0};
+ _ ->
+ {Readers0, []}
+ end,
+ {State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]};
+settle(_, _, _, #stream_client{name = Name}) ->
+ {protocol_error, not_implemented,
+ "basic.nack and basic.reject not supported by stream queues ~s",
+ [rabbit_misc:rs(Name)]}.
+
+info(Q, all_items) ->
+ info(Q, ?INFO_KEYS);
+info(Q, Items) ->
+ lists:foldr(fun(Item, Acc) ->
+ [{Item, i(Item, Q)} | Acc]
+ end, [], Items).
+
+i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q);
+i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q);
+i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q);
+i(arguments, Q) when ?is_amqqueue(Q) -> amqqueue:get_arguments(Q);
+i(leader, Q) when ?is_amqqueue(Q) ->
+ #{leader_node := Leader} = amqqueue:get_type_state(Q),
+ Leader;
+i(members, Q) when ?is_amqqueue(Q) ->
+ #{replica_nodes := Nodes} = amqqueue:get_type_state(Q),
+ Nodes;
+i(online, Q) ->
+ #{replica_pids := ReplicaPids,
+ leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
+ [node(P) || P <- ReplicaPids ++ [LeaderPid], rabbit_misc:is_process_alive(P)];
+i(state, Q) when ?is_amqqueue(Q) ->
+ %% TODO the coordinator should answer this, I guess??
+ running;
+i(messages, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ case ets:lookup(queue_coarse_metrics, QName) of
+ [{_, _, _, M, _}] ->
+ M;
+ [] ->
+ 0
+ end;
+i(messages_ready, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ case ets:lookup(queue_coarse_metrics, QName) of
+ [{_, MR, _, _, _}] ->
+ MR;
+ [] ->
+ 0
+ end;
+i(messages_unacknowledged, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ case ets:lookup(queue_coarse_metrics, QName) of
+ [{_, _, MU, _, _}] ->
+ MU;
+ [] ->
+ 0
+ end;
+i(committed_offset, Q) ->
+ %% TODO should it be on a metrics table?
+ Data = osiris_counters:overview(),
+ maps:get(committed_offset,
+ maps:get({osiris_writer, amqqueue:get_name(Q)}, Data));
+i(policy, Q) ->
+ case rabbit_policy:name(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
+i(operator_policy, Q) ->
+ case rabbit_policy:name_op(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
+i(effective_policy_definition, Q) ->
+ case rabbit_policy:effective_definition(Q) of
+ undefined -> [];
+ Def -> Def
+ end;
+i(type, _) ->
+ stream;
+i(_, _) ->
+ ''.
+
+init(Q) when ?is_amqqueue(Q) ->
+ Leader = amqqueue:get_pid(Q),
+ {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
+ #stream_client{name = amqqueue:get_name(Q),
+ leader = Leader,
+ soft_limit = SoftLimit}.
+
+close(#stream_client{readers = Readers}) ->
+ _ = maps:map(fun (_, #stream{log = Log}) ->
+ osiris_log:close(Log)
+ end, Readers),
+ ok.
+
+update(_, State) ->
+ State.
+
+state_info(_) ->
+ #{}.
+
+set_retention_policy(Name, VHost, Policy) ->
+ case rabbit_amqqueue:check_max_age(Policy) of
+ {error, _} = E ->
+ E;
+ MaxAge ->
+ QName = rabbit_misc:r(VHost, queue, Name),
+ Fun = fun(Q) ->
+ Conf = amqqueue:get_type_state(Q),
+ amqqueue:set_type_state(Q, Conf#{max_age => MaxAge})
+ end,
+ case rabbit_misc:execute_mnesia_transaction(
+ fun() -> rabbit_amqqueue:update(QName, Fun) end) of
+ not_found ->
+ {error, not_found};
+ _ ->
+ ok
+ end
+ end.
+
+add_replica(VHost, Name, Node) ->
+ QName = rabbit_misc:r(VHost, queue, Name),
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ {error, classic_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ {error, quorum_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_stream(Q) ->
+ case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
+ false ->
+ {error, node_not_running};
+ true ->
+ #{name := StreamId} = amqqueue:get_type_state(Q),
+ {ok, Reply, _} = rabbit_stream_coordinator:add_replica(StreamId, Node),
+ Reply
+ end;
+ E ->
+ E
+ end.
+
+delete_replica(VHost, Name, Node) ->
+ QName = rabbit_misc:r(VHost, queue, Name),
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ {error, classic_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ {error, quorum_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_stream(Q) ->
+ case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
+ false ->
+ {error, node_not_running};
+ true ->
+ #{name := StreamId} = amqqueue:get_type_state(Q),
+ {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
+ Reply
+ end;
+ E ->
+ E
+ end.
+
+make_stream_conf(Node, Q) ->
+ QName = amqqueue:get_name(Q),
+ Name = queue_name(QName),
+ %% MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
+ MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
+ LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
+ fun res_arg/2, Q)),
+ InitialClusterSize = initial_cluster_size(args_policy_lookup(<<"initial-cluster-size">>,
+ fun res_arg/2, Q)),
+ Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node],
+ Replicas = select_stream_nodes(InitialClusterSize - 1, Replicas0),
+ Formatter = {?MODULE, format_osiris_event, [QName]},
+ Retention = lists:filter(fun({_, R}) ->
+ R =/= undefined
+ end, [{max_bytes, MaxBytes},
+ {max_age, MaxAge}]),
+ add_if_defined(max_segment_size, MaxSegmentSize, #{reference => QName,
+ name => Name,
+ retention => Retention,
+ leader_locator_strategy => LeaderLocator,
+ leader_node => Node,
+ replica_nodes => Replicas,
+ event_formatter => Formatter,
+ epoch => 1}).
+
+select_stream_nodes(Size, All) when length(All) =< Size ->
+ All;
+select_stream_nodes(Size, All) ->
+ Node = node(),
+ case lists:member(Node, All) of
+ true ->
+ select_stream_nodes(Size - 1, lists:delete(Node, All), [Node]);
+ false ->
+ select_stream_nodes(Size, All, [])
+ end.
+
+select_stream_nodes(0, _, Selected) ->
+ Selected;
+select_stream_nodes(Size, Rest, Selected) ->
+ S = lists:nth(rand:uniform(length(Rest)), Rest),
+ select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
+
+update_stream_conf(#{reference := QName} = Conf) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
+ MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
+ Retention = lists:filter(fun({_, R}) ->
+ R =/= undefined
+ end, [{max_bytes, MaxBytes},
+ {max_age, MaxAge}]),
+ add_if_defined(max_segment_size, MaxSegmentSize, Conf#{retention => Retention});
+ _ ->
+ Conf
+ end.
+
+add_if_defined(_, undefined, Map) ->
+ Map;
+add_if_defined(Key, Value, Map) ->
+ maps:put(Key, Value, Map).
+
+format_osiris_event(Evt, QRef) ->
+ {'$gen_cast', {queue_event, QRef, Evt}}.
+
+max_age(undefined) ->
+ undefined;
+max_age(Bin) when is_binary(Bin) ->
+ rabbit_amqqueue:check_max_age(Bin);
+max_age(Age) ->
+ Age.
+
+max_age(Age1, Age2) ->
+ min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)).
+
+queue_leader_locator(undefined) -> <<"client-local">>;
+queue_leader_locator(Val) -> Val.
+
+initial_cluster_size(undefined) ->
+ length(rabbit_mnesia:cluster_nodes(running));
+initial_cluster_size(Val) ->
+ Val.
+
+res_arg(PolVal, undefined) -> PolVal;
+res_arg(_, ArgVal) -> ArgVal.
+
+queue_name(#resource{virtual_host = VHost, name = Name}) ->
+ Timestamp = erlang:integer_to_binary(erlang:system_time()),
+ osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
+ Timestamp/binary>>)).
+
+recover(Q) ->
+ rabbit_stream_coordinator:recover(),
+ {ok, Q}.
+
+check_queue_exists_in_local_node(Q) ->
+ Conf = amqqueue:get_type_state(Q),
+ AllNodes = [maps:get(leader_node, Conf) | maps:get(replica_nodes, Conf)],
+ case lists:member(node(), AllNodes) of
+ true ->
+ ok;
+ false ->
+ {protocol_error, precondition_failed,
+ "queue '~s' does not a have a replica on the local node",
+ [rabbit_misc:rs(amqqueue:get_name(Q))]}
+ end.
+
+maybe_send_reply(_ChPid, undefined) -> ok;
+maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
+
+stream_entries(Name, Id, Str) ->
+ stream_entries(Name, Id, Str, []).
+
+stream_entries(Name, LeaderPid,
+ #stream{name = QName,
+ credit = Credit,
+ start_offset = StartOffs,
+ listening_offset = LOffs,
+ log = Seg0} = Str0, MsgIn)
+ when Credit > 0 ->
+ case osiris_log:read_chunk_parsed(Seg0) of
+ {end_of_stream, Seg} ->
+ NextOffset = osiris_log:next_offset(Seg),
+ case NextOffset > LOffs of
+ true ->
+ osiris:register_offset_listener(LeaderPid, NextOffset),
+ {Str0#stream{log = Seg,
+ listening_offset = NextOffset}, MsgIn};
+ false ->
+ {Str0#stream{log = Seg}, MsgIn}
+ end;
+ {Records, Seg} ->
+ Msgs = [begin
+ Msg0 = binary_to_msg(QName, B),
+ Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
+ long, O, Msg0),
+ {Name, LeaderPid, O, false, Msg}
+ end || {O, B} <- Records,
+ O >= StartOffs],
+
+ NumMsgs = length(Msgs),
+
+ Str = Str0#stream{credit = Credit - NumMsgs,
+ log = Seg},
+ case Str#stream.credit < 1 of
+ true ->
+ %% we are done here
+ {Str, MsgIn ++ Msgs};
+ false ->
+ %% if there are fewer Msgs than Entries0 it means there were non-events
+ %% in the log and we should recurse and try again
+ stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
+ end
+ end;
+stream_entries(_Name, _Id, Str, Msgs) ->
+ {Str, Msgs}.
+
+binary_to_msg(#resource{virtual_host = VHost,
+ kind = queue,
+ name = QName}, Data) ->
+ R0 = rabbit_msg_record:init(Data),
+ %% if the message annotation isn't present the data most likely came from
+ %% the rabbitmq-stream plugin so we'll choose defaults that simulate use
+ %% of the direct exchange
+ {utf8, Exchange} = rabbit_msg_record:message_annotation(<<"x-exchange">>,
+ R0, {utf8, <<>>}),
+ {utf8, RoutingKey} = rabbit_msg_record:message_annotation(<<"x-routing-key">>,
+ R0, {utf8, QName}),
+ {Props, Payload} = rabbit_msg_record:to_amqp091(R0),
+ XName = #resource{kind = exchange,
+ virtual_host = VHost,
+ name = Exchange},
+ Content = #content{class_id = 60,
+ properties = Props,
+ properties_bin = none,
+ payload_fragments_rev = [Payload]},
+ {ok, Msg} = rabbit_basic:message(XName, RoutingKey, Content),
+ Msg.
+
+
+msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
+ routing_keys = [RKey | _],
+ content = Content}) ->
+ #content{properties = Props,
+ payload_fragments_rev = Payload} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ R0 = rabbit_msg_record:from_amqp091(Props, lists:reverse(Payload)),
+ %% TODO durable?
+ R = rabbit_msg_record:add_message_annotations(
+ #{<<"x-exchange">> => {utf8, Exchange},
+ <<"x-routing-key">> => {utf8, RKey}}, R0),
+ rabbit_msg_record:to_iodata(R).
+
+capabilities() ->
+ #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>,
+ <<"queue-leader-locator">>, <<"initial-cluster-size">>],
+ queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
+ <<"x-max-length">>, <<"x-max-length-bytes">>,
+ <<"x-single-active-consumer">>, <<"x-queue-type">>,
+ <<"x-max-age">>, <<"x-max-segment-size">>,
+ <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
+ consumer_arguments => [<<"x-stream-offset">>],
+ server_named => false}.