summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl2
-rw-r--r--deps/rabbit/src/rabbit_channel.erl36
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl2
-rw-r--r--deps/rabbit/src/rabbit_exchange.erl14
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl26
-rw-r--r--deps/rabbit/src/rabbit_virtual_queue.erl220
6 files changed, 277 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 0ea28a0592..88f8759c47 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -87,7 +87,7 @@
-export_type([name/0, qmsg/0, absent_reason/0]).
--type name() :: rabbit_types:r('queue').
+-type name() :: rabbit_types:r('queue') | rabbit_types:r('virtual_queue').
-type qpids() :: [pid()].
-type qlen() :: rabbit_types:ok(non_neg_integer()).
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 828e35b803..5034486d3e 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1399,7 +1399,10 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
consumer_tag = CTag0,
no_ack = NoAck,
nowait = NoWait},
- _, State = #ch{reply_consumer = ReplyConsumer,
+ _, State = #ch{cfg = #conf{user = User,
+ virtual_host = VHostPath},
+ reply_consumer = ReplyConsumer,
+ queue_states = QStates0,
consumer_mapping = ConsumerMapping}) ->
case maps:find(CTag0, ConsumerMapping) of
error ->
@@ -1410,10 +1413,23 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
rabbit_guid:gen_secure(), "amq.ctag");
Other -> Other
end,
- %% Precalculate both suffix and key
- {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()),
+ TmpQName = #resource{name = <<"amq.rabbitmq.reply-to">>,
+ kind = queue,
+ virtual_host = VHostPath},
+ Q = rabbit_virtual_queue:create_amqqueue(TmpQName),
+ %% suffix is pre-calculated by create_amqqueue
+ #{key := Key,
+ suffix := Suffix} = amqqueue:get_type_state(Q),
Consumer = {CTag, Suffix, Key},
- State1 = State#ch{reply_consumer = Consumer},
+ Spec = #{no_ack => true,
+ channel_pid => self(),
+ consumer_tag => CTag,
+ ok_msg => undefined,
+ acting_user => User},
+ {ok, QStates1, Actions} = rabbit_queue_type:consume(Q, Spec, QStates0),
+ State1 = handle_queue_actions(Actions,
+ State#ch{reply_consumer = Consumer,
+ queue_states = QStates1}),
case NoWait of
true -> {noreply, State1};
false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
@@ -2174,15 +2190,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
- Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
+ Qs0 = rabbit_queue_type:lookup(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
- QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
- {ok, QueueStates, Actions} ->
+ {ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
+ QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
+ ok = process_routing_mandatory(Mandatory, QueueNames, Message, State0),
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
@@ -2242,13 +2258,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
end.
process_routing_mandatory(_Mandatory = true,
- _RoutedToQs = [],
+ _RoutedToQNames = [],
Msg, State) ->
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
ok = basic_return(Msg, State, no_route),
ok;
process_routing_mandatory(_Mandatory = false,
- _RoutedToQs = [],
+ _RoutedToQNames = [],
#basic_message{exchange_name = ExchangeName}, State) ->
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index dc0c53275a..659d410fc5 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -179,7 +179,7 @@ consume(Q, Spec, State) when ?amqqueue_is_classic(Q) ->
exclusive_consume := ExclusiveConsume,
args := Args,
ok_msg := OkMsg,
- acting_user := ActingUser} = Spec,
+ acting_user := ActingUser} = Spec,
case delegate:invoke(QPid,
{gen_server2, call,
[{basic_consume, NoAck, ChPid, LimiterPid,
diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl
index 84ce55e742..7ca1e7fb2b 100644
--- a/deps/rabbit/src/rabbit_exchange.erl
+++ b/deps/rabbit/src/rabbit_exchange.erl
@@ -7,7 +7,6 @@
-module(rabbit_exchange).
-include_lib("rabbit_common/include/rabbit.hrl").
--include_lib("rabbit_common/include/rabbit_framing.hrl").
-export([recover/1, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1, exists/1,
@@ -405,27 +404,20 @@ info_all(VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)).
--spec route(rabbit_types:exchange(), rabbit_types:delivery())
- -> [rabbit_amqqueue:name()].
-
+-spec route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name()].
route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
case RName of
<<>> ->
RKsSorted = lists:usort(RKs),
- [rabbit_channel:deliver_reply(RK, Delivery) ||
- RK <- RKsSorted, virtual_reply_queue(RK)],
- [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
- not virtual_reply_queue(RK)];
+ [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted];
_ ->
Decs = rabbit_exchange_decorator:select(route, Decorators),
lists:usort(route1(Delivery, Decs, {[X], XName, []}))
end.
-virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true;
-virtual_reply_queue(_) -> false.
-
route1(_, _, {[], _, QNames}) ->
QNames;
route1(Delivery, Decorators,
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 04c09c2efd..33e61e55f3 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -47,6 +47,10 @@
notify_decorators/1
]).
+-export([
+ lookup/1
+ ]).
+
-type queue_name() :: rabbit_types:r(queue).
-type queue_ref() :: queue_name() | atom().
-type queue_state() :: term().
@@ -352,6 +356,28 @@ notify_decorators(Q) ->
Mod = amqqueue:get_type(Q),
Mod:notify_decorators(Q).
+-spec lookup([rabbit_amqqueue:name()]) ->
+ [amqqueue:amqqueue()].
+lookup(QNames) when is_list(QNames) ->
+ lookup0(QNames, []).
+
+lookup0([], Acc) ->
+ Acc;
+lookup0([#resource{kind = queue} = Name | Rem], Acc) ->
+ case rabbit_virtual_queue:is_virtual(Name) of
+ false ->
+ case ets:lookup(rabbit_queue, Name) of
+ [] ->
+ lookup0(Rem, Acc);
+ [Q] ->
+ lookup0(Rem, [Q | Acc])
+ end;
+ true ->
+ %% virtual queues are not persisted,
+ %% create a temporary amqqueue record here
+ lookup0(Rem, [rabbit_virtual_queue:create_amqqueue(Name) | Acc])
+ end.
+
-spec init() -> state().
init() ->
#?STATE{}.
diff --git a/deps/rabbit/src/rabbit_virtual_queue.erl b/deps/rabbit/src/rabbit_virtual_queue.erl
new file mode 100644
index 0000000000..f4a5b03354
--- /dev/null
+++ b/deps/rabbit/src/rabbit_virtual_queue.erl
@@ -0,0 +1,220 @@
+-module(rabbit_virtual_queue).
+-behaviour(rabbit_queue_type).
+
+-include("amqqueue.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-define(STATE, ?MODULE).
+-record(?STATE, {pid :: undefined | pid(), %% the current master pid
+ consumer_tag :: term(),
+ key :: binary(),
+ qref :: term()}).
+
+
+-opaque state() :: #?STATE{}.
+
+-export_type([state/0]).
+
+-export([
+ is_enabled/0,
+ declare/2,
+ delete/4,
+ is_recoverable/1,
+ recover/2,
+ purge/1,
+ policy_changed/1,
+ stat/1,
+ init/1,
+ close/1,
+ update/2,
+ consume/3,
+ cancel/5,
+ handle_event/2,
+ deliver/2,
+ settle/4,
+ credit/4,
+ dequeue/4,
+ info/2,
+ state_info/1,
+ capabilities/0,
+ notify_decorators/1
+ ]).
+
+-export([
+ is_virtual/1,
+ create_amqqueue/1
+ ]).
+
+
+-spec is_virtual(binary() | rabbit_amqqueue:name()) -> boolean().
+is_virtual(<<"amq.rabbitmq.reply-to.", _/binary>>) ->
+ true;
+is_virtual(#resource{name = Name}) ->
+ is_virtual(Name);
+is_virtual(_) ->
+ false.
+
+create_amqqueue(#resource{name = <<"amq.rabbitmq.reply-to">>,
+ virtual_host = VHost}) ->
+ {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()),
+ Name = <<"amq.rabbitmq.reply-to.", Suffix/binary>>,
+ QName = #resource{name = Name,
+ virtual_host = VHost,
+ kind = queue},
+ Q = amqqueue:new(QName,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{user => <<"internal">>},
+ ?MODULE),
+ TS = amqqueue:get_type_state(Q),
+ amqqueue:set_type_state(Q, TS#{key => Key,
+ suffix => Suffix});
+create_amqqueue(#resource{name = Name,
+ virtual_host = VHost,
+ kind = queue} = QName) ->
+ {Pid, Key} = pid_and_key_from_name(Name),
+ Q = amqqueue:new(QName,
+ Pid,
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{user => <<"internal">>},
+ ?MODULE),
+ TS = amqqueue:get_type_state(Q),
+ amqqueue:set_type_state(Q, TS#{key => Key}).
+
+pid_and_key_from_name(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
+ case rabbit_direct_reply_to:decode_reply_to_v2(
+ EncodedBin, rabbit_nodes:all_running_with_hashes()) of
+ {ok, Pid, Key} ->
+ {Pid, Key};
+ {error, _} ->
+ {ok, Pid, Key} = rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin),
+ {Pid, Key}
+ end.
+
+notify_decorators(_) ->
+ ok.
+
+is_enabled() -> true.
+
+declare(Q, _Node) ->
+ {new, Q}.
+
+delete(_Q, _IfUnused, _IfEmpty, _ActingUser) ->
+ {ok, 0}.
+
+is_recoverable(_Q) ->
+ false.
+
+recover(_VHost, _Queues) ->
+ {[], []}.
+
+-spec policy_changed(amqqueue:amqqueue()) -> ok.
+policy_changed(_Q) ->
+ ok.
+
+stat(_Q) ->
+ {ok, 0, 0}.
+
+-spec init(amqqueue:amqqueue()) ->
+ {ok, state()}.
+init(Q) ->
+ QName = amqqueue:get_name(Q),
+ #{key := Key} = amqqueue:get_type_state(Q),
+ {ok, #?STATE{pid = amqqueue:get_pid(Q),
+ key = Key,
+ qref = QName}}.
+
+-spec close(state()) -> ok.
+close(_State) ->
+ ok.
+
+-spec update(amqqueue:amqqueue(), state()) -> state().
+update(_Q, #?STATE{} = State) ->
+ State.
+
+consume(_Q, #{consumer_tag := CTag}, State) ->
+ {ok, State#?MODULE{consumer_tag = CTag}, []}.
+
+cancel(_Q, _ConsumerTag, _OkMsg, _ActingUser, State) ->
+ {ok, State}.
+
+-spec settle(rabbit_queue_type:settle_op(), rabbit_types:ctag(),
+ [non_neg_integer()], state()) ->
+ {state(), rabbit_queue_type:actions()}.
+settle(_Op, _CTag, _MsgIds, State) ->
+ {State, []}.
+
+credit(_CTag, _Credit, _Drain, State) ->
+ {State, []}.
+
+handle_event({down, Pid, _Info}, #?STATE{pid = Pid} = _State0) ->
+ eol;
+handle_event({deliver_reply, Key, #delivery{message = Message}},
+ #?STATE{qref = QName,
+ consumer_tag = CTag,
+ key = Key} = State) ->
+ Msg = {QName, self(), 0, false, Message},
+ {ok, State, [{deliver, CTag, false, [Msg]}]}.
+
+
+-spec deliver([{amqqueue:amqqueue(), state()}], Delivery :: term()) ->
+ {[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
+deliver(Qs0, #delivery{} = Delivery) ->
+ Actions = [begin
+ Pid = amqqueue:get_pid(Q),
+ #{key := Key} = amqqueue:get_type_state(Q),
+ Evt = {queue_event, QRef, {deliver_reply, Key, Delivery}},
+ gen_server2:cast(Pid, Evt),
+ {monitor, process, Pid}
+ end || {Q, #?MODULE{qref = QRef}} <- Qs0],
+ {Qs0, Actions}.
+
+
+-spec dequeue(NoAck :: boolean(), LimiterPid :: pid(),
+ rabbit_types:ctag(), state()) ->
+ {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), state()} |
+ {empty, state()}.
+dequeue(_NoAck, _LimiterPid, _CTag, State) ->
+ {empty, State}.
+
+-spec state_info(state()) -> #{atom() := term()}.
+state_info(_State) ->
+ #{}.
+
+%% general queue info
+-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+info(_Q, _Items) ->
+ [].
+
+-spec purge(amqqueue:amqqueue()) ->
+ {ok, non_neg_integer()}.
+purge(Q) when ?is_amqqueue(Q) ->
+ {ok, 0}.
+
+%% internal-ish
+
+capabilities() ->
+ #{unsupported_policies => [%% Stream policies
+ <<"max-age">>, <<"stream-max-segment-size-bytes">>,
+ <<"queue-leader-locator">>, <<"initial-cluster-size">>,
+ %% Quorum policies
+ <<"delivery-limit">>, <<"dead-letter-strategy">>],
+ queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
+ <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
+ <<"x-max-length-bytes">>, <<"x-max-priority">>,
+ <<"x-overflow">>, <<"x-queue-mode">>, <<"x-queue-version">>,
+ <<"x-single-active-consumer">>, <<"x-queue-type">>,
+ <<"x-queue-master-locator">>],
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>,
+ <<"x-priority">>, <<"x-credit">>],
+ server_named => true}.
+