diff options
-rw-r--r-- | deps/rabbit/src/rabbit_auth_backend_internal.erl | 23 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_khepri.erl | 211 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_store.erl | 187 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_vhost.erl | 29 | ||||
-rw-r--r-- | deps/rabbit/src/vhost.erl | 10 | ||||
-rw-r--r-- | workspace_helpers.bzl | 17 |
6 files changed, 361 insertions, 116 deletions
diff --git a/deps/rabbit/src/rabbit_auth_backend_internal.erl b/deps/rabbit/src/rabbit_auth_backend_internal.erl index 85c6e429d5..e31341ba35 100644 --- a/deps/rabbit/src/rabbit_auth_backend_internal.erl +++ b/deps/rabbit/src/rabbit_auth_backend_internal.erl @@ -251,12 +251,16 @@ check_resource_access_in_mnesia(Username, VHostPath, Name, Permission) -> end. check_resource_access_in_khepri(Username, VHostPath, Name, Permission) -> - Path = khepri_user_permission_path(Username, VHostPath), - case rabbit_khepri:get(Path) of - {ok, #user_permission{permission = P}} -> - do_check_resource_access(Name, Permission, P); - _ -> - false + UserVHost = #user_vhost{username = Username, + virtual_host = VHostPath}, + try + P = ets:lookup_element( + rabbit_khepri_user_permissions, + UserVHost, + #user_permission.permission), + do_check_resource_access(Name, Permission, P) + catch + _:_:_ -> false end. do_check_resource_access(Name, Permission, P) -> @@ -521,10 +525,9 @@ lookup_user_in_mnesia(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). lookup_user_in_khepri(Username) -> - Path = khepri_user_path(Username), - case rabbit_khepri:get(Path) of - {ok, User} -> {ok, User}; - _ -> {error, not_found} + case ets:lookup(rabbit_khepri_users, Username) of + [User] -> {ok, User}; + _ -> {error, not_found} end. -spec exists(rabbit_types:username()) -> boolean(). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index a2872522d5..e1372c3168 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -12,6 +12,7 @@ -include_lib("khepri/include/khepri.hrl"). -include_lib("rabbit_common/include/logging.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([setup/0, setup/1, @@ -99,6 +100,7 @@ setup(_) -> friendly_name => ?RA_FRIENDLY_NAME}, case khepri:start(?RA_SYSTEM, RaServerConfig) of {ok, ?STORE_ID} -> + register_projections(), ?LOG_DEBUG( "Khepri-based " ?RA_FRIENDLY_NAME " ready", #{domain => ?RMQLOG_DOMAIN_GLOBAL}), @@ -583,3 +585,212 @@ is_mnesia_table_covered_by_feature_flag(rabbit_user) -> true; is_mnesia_table_covered_by_feature_flag(rabbit_user_permission) -> true; is_mnesia_table_covered_by_feature_flag(rabbit_topic_permission) -> true; is_mnesia_table_covered_by_feature_flag(_) -> false. + +register_projections() -> + RegisterFuns = [fun register_rabbit_exchange_projection/0, + fun register_rabbit_queue_projection/0, + fun register_rabbit_vhost_projection/0, + fun register_rabbit_users_projection/0, + fun register_rabbit_user_permissions_projection/0, + fun register_rabbit_bindings_projection/0, + fun register_rabbit_index_route_projection/0, + fun register_rabbit_topic_graph_projection/0], + [case RegisterFun() of + ok -> ok; + {error, exists} -> ok; + {error, Error} -> throw(Error) + end || RegisterFun <- RegisterFuns], + ok. + +register_rabbit_exchange_projection() -> + Name = rabbit_khepri_exchange, + PathPattern = [rabbit_store, + exchanges, + _VHost = ?KHEPRI_WILDCARD_STAR, + _Name = ?KHEPRI_WILDCARD_STAR], + KeyPos = #exchange.name, + register_simple_projection(Name, PathPattern, KeyPos). + +register_rabbit_queue_projection() -> + Name = rabbit_khepri_queue, + PathPattern = [rabbit_store, + queues, + _VHost = ?KHEPRI_WILDCARD_STAR, + _Name = ?KHEPRI_WILDCARD_STAR], + KeyPos = 2, %% #amqqueue.name + register_simple_projection(Name, PathPattern, KeyPos). + +register_rabbit_vhost_projection() -> + Name = rabbit_khepri_vhost, + PathPattern = [rabbit_vhost, _VHost = ?KHEPRI_WILDCARD_STAR], + KeyPos = 2, %% #vhost.virtual_host + register_simple_projection(Name, PathPattern, KeyPos). + +register_rabbit_users_projection() -> + Name = rabbit_khepri_users, + PathPattern = [rabbit_auth_backend_internal, + users, + _UserName = ?KHEPRI_WILDCARD_STAR], + KeyPos = 2, %% #internal_user.username + register_simple_projection(Name, PathPattern, KeyPos). + +register_rabbit_user_permissions_projection() -> + Name = rabbit_khepri_user_permissions, + PathPattern = [rabbit_auth_backend_internal, + users, + _UserName = ?KHEPRI_WILDCARD_STAR, + user_permissions, + _VHost = ?KHEPRI_WILDCARD_STAR], + KeyPos = #user_permission.user_vhost, + register_simple_projection(Name, PathPattern, KeyPos). + +register_simple_projection(Name, PathPattern, KeyPos) -> + Options = #{keypos => KeyPos}, + Fun = fun(_Path, Resource) -> Resource end, + Projection = khepri_projection:new(Name, Fun, Options), + khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + +register_rabbit_bindings_projection() -> + MapFun = fun(Path, Destination) -> + [rabbit_store, routing, VHost, ExchangeName, RoutingKey] = + Path, + Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), + #binding{source = Exchange, + key = RoutingKey, + destination = Destination} + end, + ProjectionFun = projection_fun_for_sets(MapFun), + Options = #{type => bag, keypos => #binding.source}, + Projection = khepri_projection:new( + rabbit_khepri_bindings, ProjectionFun, Options), + PathPattern = [rabbit_store, + routing, + _VHost = ?KHEPRI_WILDCARD_STAR, + _ExchangeName = ?KHEPRI_WILDCARD_STAR, + _RoutingKey = ?KHEPRI_WILDCARD_STAR], + khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + +register_rabbit_index_route_projection() -> + MapFun = fun(Path, Destination) -> + [rabbit_store, routing, VHost, ExchangeName, RoutingKey] = + Path, + Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), + SourceKey = {Exchange, RoutingKey}, + #index_route{source_key = SourceKey, + destination = Destination} + end, + ProjectionFun = projection_fun_for_sets(MapFun), + Options = #{type => bag, keypos => #index_route.source_key}, + Projection = khepri_projection:new( + rabbit_khepri_index_route, ProjectionFun, Options), + PathPattern = [rabbit_store, + routing, + ?KHEPRI_WILDCARD_STAR, + ?KHEPRI_WILDCARD_STAR, + ?KHEPRI_WILDCARD_STAR], + khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + +%% Routing information is stored in the Khepri store as a `set'. +%% In order to turn these bindings into records in an ETS `bag', we use a +%% `khepri_projection:extended_projection_fun()' to determine the changes +%% `khepri_projection' should apply to the ETS table using set algebra. +projection_fun_for_sets(MapFun) -> + fun (_Table, _Path, undefined, undefined) -> + ok; + (Table, Path, undefined, NewPayload) -> + ets:insert(Table, [MapFun(Path, Element) || + Element <- sets:to_list(NewPayload)]); + + (Table, Path, OldPayload, undefined) -> + sets:fold( + fun(Element, _Acc) -> + ets:delete_object(Table, MapFun(Path, Element)) + end, [], OldPayload); + (Table, Path, OldPayload, NewPayload) -> + Deletions = sets:subtract(OldPayload, NewPayload), + Creations = sets:subtract(NewPayload, OldPayload), + sets:fold( + fun(Element, _Acc) -> + ets:delete_object(Table, MapFun(Path, Element)) + end, [], Deletions), + ets:insert(Table, [MapFun(Path, Element) || + Element <- sets:to_list(Creations)]) + end. + +register_rabbit_topic_graph_projection() -> + Name = rabbit_khepri_topic_trie, + Options = #{keypos => #topic_trie_edge.trie_edge}, + Fun = fun project_topic_trie_binding/4, + Projection = khepri_projection:new(Name, Fun, Options), + PathPattern = [rabbit_store, + topic_trie_binding, + _VHost = ?KHEPRI_WILDCARD_STAR, + _ExchangeName = ?KHEPRI_WILDCARD_STAR, + _Routes = ?KHEPRI_WILDCARD_STAR_STAR], + khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection), + ok. + +project_topic_trie_binding(_Table, _Path, undefined, undefined) -> + ok; +project_topic_trie_binding(Table, Path, undefined, NewBindings) -> + Edges = edges_for_path(Path, NewBindings), + ets:insert(Table, Edges); +project_topic_trie_binding(Table, Path, OldBindings, undefined) -> + %% Delete the edge to the bindings and all edges for which the `ToNode' + %% has no outgoing edges. + [BindingEdge | RestEdges] = edges_for_path(Path, OldBindings), + ets:delete_object(Table, BindingEdge), + trim_while_out_degree_is_zero(RestEdges); +project_topic_trie_binding(Table, Path, _OldBindings, NewBindings) -> + [BindingEdge | _RestEdges] = edges_for_path(Path, NewBindings), + ets:insert(Table, BindingEdge). + +edges_for_path( + [rabbit_store, topic_trie_binding, VHost, ExchangeName | Components], + Bindings) -> + Exchange = rabbit_misc:r(VHost, exchange, ExchangeName), + edges_for_path([root | Components], Bindings, Exchange, []). + +edges_for_path([FromNodeId, To | Rest], Bindings, Exchange, Edges) -> + ToNodeId = [To | FromNodeId], + Edge = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange, + node_id = FromNodeId, + word = To}, + node_id = ToNodeId}, + edges_for_path([ToNodeId | Rest], Bindings, Exchange, [Edge | Edges]); +edges_for_path([LeafId], Bindings, Exchange, Edges) -> + ToNodeId = {bindings, Bindings}, + Edge = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange, + node_id = LeafId, + word = bindings}, + node_id = ToNodeId}, + [Edge | Edges]. + +-spec trim_while_out_degree_is_zero(Edges) -> ok + when + Edges :: [Edge], + Edge :: #topic_trie_edge{}. + +trim_while_out_degree_is_zero([]) -> + ok; +trim_while_out_degree_is_zero([Edge | Rest]) -> + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange, + node_id = _FromNodeId}, + node_id = ToNodeId} = Edge, + OutEdgePattern = #topic_trie_edge{trie_edge = + #trie_edge{exchange_name = Exchange, + node_id = ToNodeId, + word = '_'}, + node_id = '_'}, + case ets:match(rabbit_khepri_topic_trie, OutEdgePattern, 1) of + '$end_of_table' -> + %% If the ToNode has an out degree of zero, trim the edge to + %% the node, effectively erasing ToNode. + ets:delete_object(rabbit_khepri_topic_trie, Edge), + trim_while_out_degree_is_zero(Rest); + {_Match, _Continuation} -> + %% Return after finding the first node with a non-zero out-degree. + %% If a node has a non-zero out-degree then all of its ancestors + %% must as well. + ok + end. diff --git a/deps/rabbit/src/rabbit_store.erl b/deps/rabbit/src/rabbit_store.erl index 974671a0f9..a3f0bc0eff 100644 --- a/deps/rabbit/src/rabbit_store.erl +++ b/deps/rabbit/src/rabbit_store.erl @@ -241,12 +241,12 @@ match_exchanges(Pattern0) -> lookup_exchange(Name) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> lookup({rabbit_exchange, Name}, mnesia) end, - fun() -> lookup(khepri_exchange_path(Name), khepri) end). + fun() -> lookup_resource_in_khepri_projection(rabbit_khepri_exchange, Name) end). lookup_many_exchanges(Names) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> lookup_many(rabbit_exchange, Names, mnesia) end, - fun() -> lookup_many(fun khepri_exchange_path/1, Names, khepri) end). + fun() -> lookup_resources_in_khepri_projection(rabbit_khepri_exchange, Names) end). peek_exchange_serial(XName, LockType) -> rabbit_khepri:try_mnesia_or_khepri( @@ -489,39 +489,34 @@ list_bindings(VHost) -> [B || #route{binding = B} <- list_in_mnesia(rabbit_route, Match)] end, fun() -> - Path = khepri_routes_path() ++ [VHost, if_has_data_wildcard()], - lists:foldl(fun(SetOfBindings, Acc) -> - sets:to_list(SetOfBindings) ++ Acc - end, [], list_in_khepri(Path)) + VHostResource = rabbit_misc:r(VHost, '_'), + Match = #binding{source = VHostResource, + destination = VHostResource, + _ = '_'}, + ets:match(rabbit_khepri_bindings, Match) end). -list_bindings_for_source(#resource{virtual_host = VHost, name = Name} = Resource) -> +list_bindings_for_source(Source) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> - Route = #route{binding = #binding{source = Resource, _ = '_'}}, + Route = #route{binding = #binding{source = Source, _ = '_'}}, [B || #route{binding = B} <- list_in_mnesia(rabbit_route, Route)] end, - fun() -> - Path = khepri_routes_path() ++ [VHost, Name, if_has_data_wildcard()], - lists:foldl(fun(SetOfBindings, Acc) -> - sets:to_list(SetOfBindings) ++ Acc - end, [], list_in_khepri(Path)) - end). + fun() -> ets:lookup(rabbit_khepri_bindings, Source) end). -list_bindings_for_destination(#resource{virtual_host = VHost, name = Name, - kind = Kind} = Resource) -> +list_bindings_for_destination(Destination) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> - Route = rabbit_binding:reverse_route(#route{binding = #binding{destination = Resource, + Route = rabbit_binding:reverse_route(#route{binding = #binding{destination = Destination, _ = '_'}}), [rabbit_binding:reverse_binding(B) || #reverse_route{reverse_binding = B} <- list_in_mnesia(rabbit_reverse_route, Route)] end, fun() -> - Path = khepri_routes_path() ++ [VHost, ?KHEPRI_WILDCARD_STAR, Kind, Name, if_has_data_wildcard()], - lists:foldl(fun(SetOfBindings, Acc) -> - sets:to_list(SetOfBindings) ++ Acc - end, [], list_in_khepri(Path)) + %% TODO: projection for bindings indexed by destination? + Match = #binding{destination = Destination, + _ = '_'}, + ets:match(rabbit_khepri_bindings, Match) end). list_bindings_for_source_and_destination(SrcName, DstName) -> @@ -533,10 +528,10 @@ list_bindings_for_source_and_destination(SrcName, DstName) -> [B || #route{binding = B} <- list_in_mnesia(rabbit_route, Route)] end, fun() -> - Values = match_source_and_destination_in_khepri(SrcName, DstName), - lists:foldl(fun(SetOfBindings, Acc) -> - sets:to_list(SetOfBindings) ++ Acc - end, [], Values) + Match = #binding{source = SrcName, + destination = DstName, + _ = '_'}, + ets:match(rabbit_khepri_bindings, Match) end). list_explicit_bindings() -> @@ -728,12 +723,12 @@ store_durable_queue(Q) -> lookup_queue(Name) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> lookup({rabbit_queue, Name}, mnesia) end, - fun() -> lookup(khepri_queue_path(Name), khepri) end). + fun() -> lookup_resource_in_khepri_projection(rabbit_khepri_queue, Name) end). lookup_durable_queue(Name) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> lookup({rabbit_durable_queue, Name}, mnesia) end, - fun() -> lookup(khepri_queue_path(Name), khepri) end). + fun() -> lookup_resource_in_khepri_projection(rabbit_khepri_queue, Name) end). %% TODO this should be internal, it's here because of mirrored queues lookup_queue_in_khepri_tx(Name) -> @@ -742,14 +737,14 @@ lookup_queue_in_khepri_tx(Name) -> lookup_queues(Names) when is_list(Names) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> lookup_many(rabbit_queue, Names, mnesia) end, - fun() -> lookup_many(fun khepri_queue_path/1, Names, khepri) end); + fun() -> lookup_resources_in_khepri_projection(rabbit_khepri_queue, Names) end); lookup_queues(Name) -> lookup_queue(Name). lookup_durable_queues(Names) when is_list(Names) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> lookup_many(rabbit_durable_queue, Names, mnesia) end, - fun() -> lookup_many(fun khepri_queue_path/1, Names, khepri) end); + fun() -> lookup_resources_in_khepri_projection(rabbit_khepri_queue, Names) end); lookup_durable_queues(Name) -> lookup_durable_queue(Name). @@ -759,7 +754,7 @@ exists_queue(Name) -> ets:member(rabbit_queue, Name) end, fun() -> - rabbit_khepri:exists(khepri_queue_path(Name)) + ets:member(rabbit_khepri_queue, Name) end). update_queue(#resource{virtual_host = VHost, name = Name} = QName, Fun) -> @@ -894,11 +889,9 @@ match_bindings(SrcName, Match) -> Routes, Match(Binding)] end, fun() -> - Data = match_source_in_khepri(SrcName), - Bindings = lists:foldl(fun(SetOfBindings, Acc) -> - sets:to_list(SetOfBindings) ++ Acc - end, [], maps:values(Data)), - [Dest || Binding = #binding{destination = Dest} <- Bindings, Match(Binding)] + Routes = ets:lookup(rabbit_khepri_bindings, SrcName), + [Dest || Binding = #binding{destination = Dest} <- + Routes, Match(Binding)] end). match_routing_key(SrcName, RoutingKeys, UseIndex) -> @@ -906,13 +899,18 @@ match_routing_key(SrcName, RoutingKeys, UseIndex) -> fun() -> case UseIndex of true -> - route_in_mnesia_v2(SrcName, RoutingKeys); + route_v2(rabbit_index_route, SrcName, RoutingKeys); _ -> route_in_mnesia_v1(SrcName, RoutingKeys) end end, fun() -> - match_source_and_key_in_khepri(SrcName, RoutingKeys) + case UseIndex of + true -> + route_v2(rabbit_khepri_index_route, SrcName, RoutingKeys); + _ -> + match_source_and_key_in_khepri(SrcName, RoutingKeys) + end end). route_in_mnesia_v1(SrcName, [RoutingKey]) -> @@ -945,27 +943,27 @@ route_in_mnesia_v1(SrcName, [_|_] = RoutingKeys) -> %% ets:select/2 is expensive because it needs to compile the match spec every %% time and lookup does not happen by a hash key. %% -%% In contrast, route_v2/2 increases end-to-end message sending throughput +%% In contrast, route_v2/3 increases end-to-end message sending throughput %% (i.e. from RabbitMQ client to the queue process) by up to 35% by using ets:lookup_element/3. %% Only the direct exchange type uses the rabbit_index_route table to store its %% bindings by table key tuple {SourceExchange, RoutingKey}. --spec route_in_mnesia_v2(rabbit_types:binding_source(), [rabbit_router:routing_key(), ...]) -> +-spec route_v2(ets:table(), rabbit_types:binding_source(), [rabbit_router:routing_key(), ...]) -> rabbit_router:match_result(). -route_in_mnesia_v2(SrcName, [RoutingKey]) -> +route_v2(Table, SrcName, [RoutingKey]) -> %% optimization - destinations(SrcName, RoutingKey); -route_in_mnesia_v2(SrcName, [_|_] = RoutingKeys) -> + destinations(Table, SrcName, RoutingKey); +route_v2(Table, SrcName, [_|_] = RoutingKeys) -> lists:flatmap(fun(Key) -> - destinations(SrcName, Key) + destinations(Table, SrcName, Key) end, RoutingKeys). -destinations(SrcName, RoutingKey) -> +destinations(Table, SrcName, RoutingKey) -> %% Prefer try-catch block over checking Key existence with ets:member/2. %% The latter reduces throughput by a few thousand messages per second because %% of function db_member_hash in file erl_db_hash.c. %% We optimise for the happy path, that is the binding / table key is present. try - ets:lookup_element(rabbit_index_route, + ets:lookup_element(Table, {SrcName, RoutingKey}, #index_route.destination) catch @@ -1003,26 +1001,8 @@ add_topic_trie_binding_tx(XName, RoutingKey, Destination, Args) -> ok = khepri_tx:put(Path, Set). route_delivery_for_exchange_type_topic(XName, RoutingKey) -> - Words = lists:map(fun(W) -> #if_any{conditions = [W, <<"*">>]} end, - split_topic_trie_key(RoutingKey)), - Root = khepri_exchange_type_topic_path(XName), - Path0 = Root ++ Words, - {Hd, [Tl]} = lists:split(length(Path0) - 1, Path0), - Path = Hd ++ [if_has_data([Tl])], - Fanout = Root ++ [<<"#">>], - Map = case rabbit_khepri:get(Fanout) of - {ok, Data} when Data =/= undefined -> - #{Fanout => Data}; - _ -> - case rabbit_khepri:match(Path) of - {ok, Map0} -> Map0; - _ -> #{} - end - end, - maps:fold(fun(_, Data, Acc) -> - Bindings = sets:to_list(Data), - [maps:get(destination, B) || B <- Bindings] ++ Acc - end, [], Map). + Words = split_topic_trie_key(RoutingKey), + trie_match(XName, Words). delete_topic_trie_bindings_for_exchange(XName) -> ok = rabbit_khepri:delete(khepri_exchange_type_topic_path(XName)). @@ -1932,22 +1912,15 @@ binding_has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) - Error end. -match_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> - Path = khepri_routes_path() ++ [VHost, Name, if_has_data_wildcard()], - {ok, Map} = rabbit_khepri:match(Path), - Map. - match_source_and_key_in_khepri(Src, ['_']) -> - Path = khepri_routing_path(Src, if_has_data_wildcard()), - case rabbit_khepri:match(Path) of - {ok, Map} -> - maps:fold(fun(_, Dsts, Acc) -> - sets:to_list(Dsts) ++ Acc - end, [], Map); - {error, {khepri, node_not_found, _}} -> + try + ets:lookup_element(rabbit_khepri_bindings, Src, #binding.destination) + catch + error:badarg -> [] end; match_source_and_key_in_khepri(Src, RoutingKeys) -> + %% TODO: use {@link ets:select/2} lists:foldl( fun(RK, Acc) -> Path = khepri_routing_path(Src, RK), @@ -2302,3 +2275,63 @@ recover_mnesia_tables() -> ++ [Table || {Table, _} <- rabbit_table:definitions()], [mnesia:change_table_access_mode(Table, read_write) || Table <- Tables], ok. + +lookup_resource_in_khepri_projection(Projection, Key) -> + case ets:lookup(Projection, Key) of + [Resource] -> {ok, Resource}; + [] -> {error, not_found} + end. + +lookup_resources_in_khepri_projection(Projection, Keys) -> + lists:append([ets:lookup(Projection, Name) || Name <- Keys]). + +%%--->8--- Khepri Topic Exchange Routing --->8--- +%% This is slightly different than the mnesia version. We use only one +%% table. Bindings are the leaf nodes of the trie. + +%% Also, lists are rewritten as binaries. Why does this work before? + +trie_match(X, Words) -> + trie_match(X, root, Words, []). + +trie_match(X, Node, [], ResAcc) -> + trie_match_part(X, Node, <<"#">>, fun trie_match_skip_any/4, [], + trie_bindings(X, Node) ++ ResAcc); +trie_match(X, Node, [W | RestW] = Words, ResAcc) -> + lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> + trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) + end, ResAcc, [{W, fun trie_match/4, RestW}, + {<<"*">>, fun trie_match/4, RestW}, + {<<"#">>, fun trie_match_skip_any/4, Words}]). + +trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> + case trie_child(X, Node, Search) of + {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); + error -> ResAcc + end. + +trie_match_skip_any(X, Node, [], ResAcc) -> + trie_match(X, Node, [], ResAcc); +trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> + trie_match_skip_any(X, Node, RestW, + trie_match(X, Node, Words, ResAcc)). + +trie_child(X, Node, Word) -> + case ets:lookup(rabbit_khepri_topic_trie, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of + [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + case ets:lookup(rabbit_khepri_topic_trie, + #trie_edge{exchange_name = X, + node_id = Node, + word = bindings}) of + [#topic_trie_edge{node_id = {bindings, Bindings}}] -> + [Dest || #{destination := Dest} <- sets:to_list(Bindings)]; + [] -> + [] + end. diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 4fd7a76286..3d464f22be 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -580,8 +580,7 @@ exists_in_mnesia(VHost) -> mnesia:dirty_read({rabbit_vhost, VHost}) /= []. exists_in_khepri(VHost) -> - Path = khepri_vhost_path(VHost), - rabbit_khepri:exists(Path). + ets:member(rabbit_khepri_vhost, VHost). -spec list_names() -> [vhost:name()]. list_names() -> @@ -593,11 +592,7 @@ list_names_in_mnesia() -> mnesia:dirty_all_keys(rabbit_vhost). list_names_in_khepri() -> - Path = khepri_vhosts_path(), - case rabbit_khepri:list_child_nodes(Path) of - {ok, Result} -> Result; - _ -> [] - end. + ets:select(rabbit_khepri_vhost, [{vhost:pattern_match_names(), [], ['$1']}]). %% Exists for backwards compatibility, prefer list_names/0. -spec list() -> [vhost:name()]. @@ -613,11 +608,7 @@ all_in_mnesia() -> mnesia:dirty_match_object(rabbit_vhost, vhost:pattern_match_all()). all_in_khepri() -> - Path = khepri_vhosts_path(), - case rabbit_khepri:list(Path) of - {ok, VHosts} -> maps:values(VHosts); - _ -> [] - end. + ets:tab2list(rabbit_khepri_vhost). -spec all_tagged_with(atom()) -> [vhost:vhost()]. all_tagged_with(TagName) -> @@ -655,10 +646,9 @@ lookup_in_mnesia(VHostName) -> end. lookup_in_khepri(VHostName) -> - Path = khepri_vhost_path(VHostName), - case rabbit_khepri:get(Path) of - {ok, Record} -> Record; - _ -> {error, {no_such_vhost, VHostName}} + case ets:lookup(rabbit_khepri_vhost, VHostName) of + [Record] -> Record; + _ -> {error, {no_such_vhost, VHostName}} end. with_in_mnesia(VHostName, Thunk) -> @@ -869,10 +859,9 @@ info_in_mnesia(Key) -> end. info_in_khepri(Key) -> - Path = khepri_vhost_path(Key), - case rabbit_khepri:get(Path) of - {ok, VHost} -> infos(?INFO_KEYS, VHost); - _ -> [] + case ets:lookup(rabbit_khepri_vhost, Key) of + [VHost] -> infos(?INFO_KEYS, VHost); + _ -> [] end. -spec info(vhost:vhost(), rabbit_types:info_keys()) -> rabbit_types:infos(). diff --git a/deps/rabbit/src/vhost.erl b/deps/rabbit/src/vhost.erl index cdd3ba3ce5..fb0cb99c6d 100644 --- a/deps/rabbit/src/vhost.erl +++ b/deps/rabbit/src/vhost.erl @@ -20,6 +20,7 @@ upgrade/1, upgrade_to/2, pattern_match_all/0, + pattern_match_names/0, get_name/1, get_limits/1, get_metadata/1, @@ -46,7 +47,7 @@ -record(vhost, { %% name as a binary - virtual_host :: name() | '_', + virtual_host :: name() | '_' | '$1', %% proplist of limits configured, if any limits :: list() | '_', metadata :: metadata() | '_' @@ -60,7 +61,7 @@ -type vhost_pattern() :: vhost_v2_pattern(). -type vhost_v2_pattern() :: #vhost{ - virtual_host :: name() | '_', + virtual_host :: name() | '_' | '$1', limits :: '_', metadata :: '_' }. @@ -112,10 +113,13 @@ info_keys() -> cluster_state]. -spec pattern_match_all() -> vhost_pattern(). - pattern_match_all() -> #vhost{_ = '_'}. +-spec pattern_match_names() -> vhost_pattern(). +pattern_match_names() -> + #vhost{virtual_host = '$1', _ = '_'}. + -spec get_name(vhost()) -> name(). get_name(#vhost{virtual_host = Value}) -> Value. diff --git a/workspace_helpers.bzl b/workspace_helpers.bzl index 625b10ed8e..45b6e38932 100644 --- a/workspace_helpers.bzl +++ b/workspace_helpers.bzl @@ -125,8 +125,8 @@ def rabbitmq_external_deps(rabbitmq_workspace = "@rabbitmq-server"): github_erlang_app( name = "khepri", org = "rabbitmq", - ref = "main", - version = "main", + ref = "b7f9bac94857078a0b0584aa72a920f7596a1e49", + version = "b7f9bac94857078a0b0584aa72a920f7596a1e49", build_file = rabbitmq_workspace + "//:BUILD.khepri", ) @@ -180,11 +180,16 @@ sed -i"_orig" -E '/VERSION/ s/[0-9]+\\.[0-9]+\\.[0-9]+/'${VERSION}'/' BUILD.baze sha256 = "282a8a323ca2a845c9e6f787d166348f776c1d4a41ede63046d72d422e3da946", ) - git_repository( + hex_pm_erlang_app( name = "ra", - branch = "main", - remote = "https://github.com/rabbitmq/ra.git", - patch_cmds = [RA_INJECT_GIT_VERSION], + version = "2.4.0", + sha256 = "f716146ee1755823fbff741669053efc76e241caa95465782fc4c0a4389eb40c", + deps = [ + "@gen_batch_server//:erlang_app", + ], + runtime_deps = [ + "@aten//:erlang_app", + ] ) hex_archive( |