diff options
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 36 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_exchange.erl | 14 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 26 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_virtual_queue.erl | 220 |
6 files changed, 277 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 0ea28a0592..88f8759c47 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -87,7 +87,7 @@ -export_type([name/0, qmsg/0, absent_reason/0]). --type name() :: rabbit_types:r('queue'). +-type name() :: rabbit_types:r('queue') | rabbit_types:r('virtual_queue'). -type qpids() :: [pid()]. -type qlen() :: rabbit_types:ok(non_neg_integer()). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 828e35b803..5034486d3e 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1399,7 +1399,10 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, consumer_tag = CTag0, no_ack = NoAck, nowait = NoWait}, - _, State = #ch{reply_consumer = ReplyConsumer, + _, State = #ch{cfg = #conf{user = User, + virtual_host = VHostPath}, + reply_consumer = ReplyConsumer, + queue_states = QStates0, consumer_mapping = ConsumerMapping}) -> case maps:find(CTag0, ConsumerMapping) of error -> @@ -1410,10 +1413,23 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, rabbit_guid:gen_secure(), "amq.ctag"); Other -> Other end, - %% Precalculate both suffix and key - {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()), + TmpQName = #resource{name = <<"amq.rabbitmq.reply-to">>, + kind = queue, + virtual_host = VHostPath}, + Q = rabbit_virtual_queue:create_amqqueue(TmpQName), + %% suffix is pre-calculated by create_amqqueue + #{key := Key, + suffix := Suffix} = amqqueue:get_type_state(Q), Consumer = {CTag, Suffix, Key}, - State1 = State#ch{reply_consumer = Consumer}, + Spec = #{no_ack => true, + channel_pid => self(), + consumer_tag => CTag, + ok_msg => undefined, + acting_user => User}, + {ok, QStates1, Actions} = rabbit_queue_type:consume(Q, Spec, QStates0), + State1 = handle_queue_actions(Actions, + State#ch{reply_consumer = Consumer, + queue_states = QStates1}), case NoWait of true -> {noreply, State1}; false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, @@ -2174,15 +2190,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex confirm = Confirm, msg_seq_no = MsgSeqNo}, RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue - Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames), + Qs0 = rabbit_queue_type:lookup(RoutedToQueueNames), Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0), - QueueNames = lists:map(fun amqqueue:get_name/1, Qs), case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of - {ok, QueueStates, Actions} -> + {ok, QueueStates, Actions} -> rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))), %% NB: the order here is important since basic.returns must be %% sent before confirms. - ok = process_routing_mandatory(Mandatory, Qs, Message, State0), + QueueNames = lists:map(fun amqqueue:get_name/1, Qs), + ok = process_routing_mandatory(Mandatory, QueueNames, Message, State0), State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes @@ -2242,13 +2258,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex end. process_routing_mandatory(_Mandatory = true, - _RoutedToQs = [], + _RoutedToQNames = [], Msg, State) -> rabbit_global_counters:messages_unroutable_returned(amqp091, 1), ok = basic_return(Msg, State, no_route), ok; process_routing_mandatory(_Mandatory = false, - _RoutedToQs = [], + _RoutedToQNames = [], #basic_message{exchange_name = ExchangeName}, State) -> rabbit_global_counters:messages_unroutable_dropped(amqp091, 1), ?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State), diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index dc0c53275a..659d410fc5 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -179,7 +179,7 @@ consume(Q, Spec, State) when ?amqqueue_is_classic(Q) -> exclusive_consume := ExclusiveConsume, args := Args, ok_msg := OkMsg, - acting_user := ActingUser} = Spec, + acting_user := ActingUser} = Spec, case delegate:invoke(QPid, {gen_server2, call, [{basic_consume, NoAck, ChPid, LimiterPid, diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 84ce55e742..7ca1e7fb2b 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -7,7 +7,6 @@ -module(rabbit_exchange). -include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit_common/include/rabbit_framing.hrl"). -export([recover/1, policy_changed/2, callback/4, declare/7, assert_equivalence/6, assert_args_equivalence/2, check_type/1, exists/1, @@ -405,27 +404,20 @@ info_all(VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)). --spec route(rabbit_types:exchange(), rabbit_types:delivery()) - -> [rabbit_amqqueue:name()]. - +-spec route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name()]. route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> case RName of <<>> -> RKsSorted = lists:usort(RKs), - [rabbit_channel:deliver_reply(RK, Delivery) || - RK <- RKsSorted, virtual_reply_queue(RK)], - [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, - not virtual_reply_queue(RK)]; + [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted]; _ -> Decs = rabbit_exchange_decorator:select(route, Decorators), lists:usort(route1(Delivery, Decs, {[X], XName, []})) end. -virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true; -virtual_reply_queue(_) -> false. - route1(_, _, {[], _, QNames}) -> QNames; route1(Delivery, Decorators, diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 04c09c2efd..33e61e55f3 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -47,6 +47,10 @@ notify_decorators/1 ]). +-export([ + lookup/1 + ]). + -type queue_name() :: rabbit_types:r(queue). -type queue_ref() :: queue_name() | atom(). -type queue_state() :: term(). @@ -352,6 +356,28 @@ notify_decorators(Q) -> Mod = amqqueue:get_type(Q), Mod:notify_decorators(Q). +-spec lookup([rabbit_amqqueue:name()]) -> + [amqqueue:amqqueue()]. +lookup(QNames) when is_list(QNames) -> + lookup0(QNames, []). + +lookup0([], Acc) -> + Acc; +lookup0([#resource{kind = queue} = Name | Rem], Acc) -> + case rabbit_virtual_queue:is_virtual(Name) of + false -> + case ets:lookup(rabbit_queue, Name) of + [] -> + lookup0(Rem, Acc); + [Q] -> + lookup0(Rem, [Q | Acc]) + end; + true -> + %% virtual queues are not persisted, + %% create a temporary amqqueue record here + lookup0(Rem, [rabbit_virtual_queue:create_amqqueue(Name) | Acc]) + end. + -spec init() -> state(). init() -> #?STATE{}. diff --git a/deps/rabbit/src/rabbit_virtual_queue.erl b/deps/rabbit/src/rabbit_virtual_queue.erl new file mode 100644 index 0000000000..f4a5b03354 --- /dev/null +++ b/deps/rabbit/src/rabbit_virtual_queue.erl @@ -0,0 +1,220 @@ +-module(rabbit_virtual_queue). +-behaviour(rabbit_queue_type). + +-include("amqqueue.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-define(STATE, ?MODULE). +-record(?STATE, {pid :: undefined | pid(), %% the current master pid + consumer_tag :: term(), + key :: binary(), + qref :: term()}). + + +-opaque state() :: #?STATE{}. + +-export_type([state/0]). + +-export([ + is_enabled/0, + declare/2, + delete/4, + is_recoverable/1, + recover/2, + purge/1, + policy_changed/1, + stat/1, + init/1, + close/1, + update/2, + consume/3, + cancel/5, + handle_event/2, + deliver/2, + settle/4, + credit/4, + dequeue/4, + info/2, + state_info/1, + capabilities/0, + notify_decorators/1 + ]). + +-export([ + is_virtual/1, + create_amqqueue/1 + ]). + + +-spec is_virtual(binary() | rabbit_amqqueue:name()) -> boolean(). +is_virtual(<<"amq.rabbitmq.reply-to.", _/binary>>) -> + true; +is_virtual(#resource{name = Name}) -> + is_virtual(Name); +is_virtual(_) -> + false. + +create_amqqueue(#resource{name = <<"amq.rabbitmq.reply-to">>, + virtual_host = VHost}) -> + {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()), + Name = <<"amq.rabbitmq.reply-to.", Suffix/binary>>, + QName = #resource{name = Name, + virtual_host = VHost, + kind = queue}, + Q = amqqueue:new(QName, + self(), + false, + false, + none, + [], + VHost, + #{user => <<"internal">>}, + ?MODULE), + TS = amqqueue:get_type_state(Q), + amqqueue:set_type_state(Q, TS#{key => Key, + suffix => Suffix}); +create_amqqueue(#resource{name = Name, + virtual_host = VHost, + kind = queue} = QName) -> + {Pid, Key} = pid_and_key_from_name(Name), + Q = amqqueue:new(QName, + Pid, + false, + false, + none, + [], + VHost, + #{user => <<"internal">>}, + ?MODULE), + TS = amqqueue:get_type_state(Q), + amqqueue:set_type_state(Q, TS#{key => Key}). + +pid_and_key_from_name(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> + case rabbit_direct_reply_to:decode_reply_to_v2( + EncodedBin, rabbit_nodes:all_running_with_hashes()) of + {ok, Pid, Key} -> + {Pid, Key}; + {error, _} -> + {ok, Pid, Key} = rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin), + {Pid, Key} + end. + +notify_decorators(_) -> + ok. + +is_enabled() -> true. + +declare(Q, _Node) -> + {new, Q}. + +delete(_Q, _IfUnused, _IfEmpty, _ActingUser) -> + {ok, 0}. + +is_recoverable(_Q) -> + false. + +recover(_VHost, _Queues) -> + {[], []}. + +-spec policy_changed(amqqueue:amqqueue()) -> ok. +policy_changed(_Q) -> + ok. + +stat(_Q) -> + {ok, 0, 0}. + +-spec init(amqqueue:amqqueue()) -> + {ok, state()}. +init(Q) -> + QName = amqqueue:get_name(Q), + #{key := Key} = amqqueue:get_type_state(Q), + {ok, #?STATE{pid = amqqueue:get_pid(Q), + key = Key, + qref = QName}}. + +-spec close(state()) -> ok. +close(_State) -> + ok. + +-spec update(amqqueue:amqqueue(), state()) -> state(). +update(_Q, #?STATE{} = State) -> + State. + +consume(_Q, #{consumer_tag := CTag}, State) -> + {ok, State#?MODULE{consumer_tag = CTag}, []}. + +cancel(_Q, _ConsumerTag, _OkMsg, _ActingUser, State) -> + {ok, State}. + +-spec settle(rabbit_queue_type:settle_op(), rabbit_types:ctag(), + [non_neg_integer()], state()) -> + {state(), rabbit_queue_type:actions()}. +settle(_Op, _CTag, _MsgIds, State) -> + {State, []}. + +credit(_CTag, _Credit, _Drain, State) -> + {State, []}. + +handle_event({down, Pid, _Info}, #?STATE{pid = Pid} = _State0) -> + eol; +handle_event({deliver_reply, Key, #delivery{message = Message}}, + #?STATE{qref = QName, + consumer_tag = CTag, + key = Key} = State) -> + Msg = {QName, self(), 0, false, Message}, + {ok, State, [{deliver, CTag, false, [Msg]}]}. + + +-spec deliver([{amqqueue:amqqueue(), state()}], Delivery :: term()) -> + {[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}. +deliver(Qs0, #delivery{} = Delivery) -> + Actions = [begin + Pid = amqqueue:get_pid(Q), + #{key := Key} = amqqueue:get_type_state(Q), + Evt = {queue_event, QRef, {deliver_reply, Key, Delivery}}, + gen_server2:cast(Pid, Evt), + {monitor, process, Pid} + end || {Q, #?MODULE{qref = QRef}} <- Qs0], + {Qs0, Actions}. + + +-spec dequeue(NoAck :: boolean(), LimiterPid :: pid(), + rabbit_types:ctag(), state()) -> + {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), state()} | + {empty, state()}. +dequeue(_NoAck, _LimiterPid, _CTag, State) -> + {empty, State}. + +-spec state_info(state()) -> #{atom() := term()}. +state_info(_State) -> + #{}. + +%% general queue info +-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) -> + rabbit_types:infos(). +info(_Q, _Items) -> + []. + +-spec purge(amqqueue:amqqueue()) -> + {ok, non_neg_integer()}. +purge(Q) when ?is_amqqueue(Q) -> + {ok, 0}. + +%% internal-ish + +capabilities() -> + #{unsupported_policies => [%% Stream policies + <<"max-age">>, <<"stream-max-segment-size-bytes">>, + <<"queue-leader-locator">>, <<"initial-cluster-size">>, + %% Quorum policies + <<"delivery-limit">>, <<"dead-letter-strategy">>], + queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>, + <<"x-dead-letter-routing-key">>, <<"x-max-length">>, + <<"x-max-length-bytes">>, <<"x-max-priority">>, + <<"x-overflow">>, <<"x-queue-mode">>, <<"x-queue-version">>, + <<"x-single-active-consumer">>, <<"x-queue-type">>, + <<"x-queue-master-locator">>], + consumer_arguments => [<<"x-cancel-on-ha-failover">>, + <<"x-priority">>, <<"x-credit">>], + server_named => true}. + |