summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Davis <mcarsondavis@gmail.com>2022-10-21 15:24:37 -0500
committerMichael Davis <mcarsondavis@gmail.com>2022-11-07 10:44:34 -0600
commit5e4a92a8b9dc49859ccefb56e01e7e56761b4207 (patch)
treeba9ec57d63f643fb8d5763d5baa1947b36345c4f
parent86204d17f83a7f78f6f04856361ba8ac2ae14cb7 (diff)
downloadrabbitmq-server-git-khepri-projections.tar.gz
WIP projectionskhepri-projections
-rw-r--r--deps/rabbit/src/rabbit_auth_backend_internal.erl23
-rw-r--r--deps/rabbit/src/rabbit_khepri.erl211
-rw-r--r--deps/rabbit/src/rabbit_store.erl187
-rw-r--r--deps/rabbit/src/rabbit_vhost.erl29
-rw-r--r--deps/rabbit/src/vhost.erl10
-rw-r--r--workspace_helpers.bzl17
6 files changed, 361 insertions, 116 deletions
diff --git a/deps/rabbit/src/rabbit_auth_backend_internal.erl b/deps/rabbit/src/rabbit_auth_backend_internal.erl
index 85c6e429d5..e31341ba35 100644
--- a/deps/rabbit/src/rabbit_auth_backend_internal.erl
+++ b/deps/rabbit/src/rabbit_auth_backend_internal.erl
@@ -251,12 +251,16 @@ check_resource_access_in_mnesia(Username, VHostPath, Name, Permission) ->
end.
check_resource_access_in_khepri(Username, VHostPath, Name, Permission) ->
- Path = khepri_user_permission_path(Username, VHostPath),
- case rabbit_khepri:get(Path) of
- {ok, #user_permission{permission = P}} ->
- do_check_resource_access(Name, Permission, P);
- _ ->
- false
+ UserVHost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ try
+ P = ets:lookup_element(
+ rabbit_khepri_user_permissions,
+ UserVHost,
+ #user_permission.permission),
+ do_check_resource_access(Name, Permission, P)
+ catch
+ _:_:_ -> false
end.
do_check_resource_access(Name, Permission, P) ->
@@ -521,10 +525,9 @@ lookup_user_in_mnesia(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
lookup_user_in_khepri(Username) ->
- Path = khepri_user_path(Username),
- case rabbit_khepri:get(Path) of
- {ok, User} -> {ok, User};
- _ -> {error, not_found}
+ case ets:lookup(rabbit_khepri_users, Username) of
+ [User] -> {ok, User};
+ _ -> {error, not_found}
end.
-spec exists(rabbit_types:username()) -> boolean().
diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl
index a2872522d5..e1372c3168 100644
--- a/deps/rabbit/src/rabbit_khepri.erl
+++ b/deps/rabbit/src/rabbit_khepri.erl
@@ -12,6 +12,7 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/logging.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([setup/0,
setup/1,
@@ -99,6 +100,7 @@ setup(_) ->
friendly_name => ?RA_FRIENDLY_NAME},
case khepri:start(?RA_SYSTEM, RaServerConfig) of
{ok, ?STORE_ID} ->
+ register_projections(),
?LOG_DEBUG(
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
@@ -583,3 +585,212 @@ is_mnesia_table_covered_by_feature_flag(rabbit_user) -> true;
is_mnesia_table_covered_by_feature_flag(rabbit_user_permission) -> true;
is_mnesia_table_covered_by_feature_flag(rabbit_topic_permission) -> true;
is_mnesia_table_covered_by_feature_flag(_) -> false.
+
+register_projections() ->
+ RegisterFuns = [fun register_rabbit_exchange_projection/0,
+ fun register_rabbit_queue_projection/0,
+ fun register_rabbit_vhost_projection/0,
+ fun register_rabbit_users_projection/0,
+ fun register_rabbit_user_permissions_projection/0,
+ fun register_rabbit_bindings_projection/0,
+ fun register_rabbit_index_route_projection/0,
+ fun register_rabbit_topic_graph_projection/0],
+ [case RegisterFun() of
+ ok -> ok;
+ {error, exists} -> ok;
+ {error, Error} -> throw(Error)
+ end || RegisterFun <- RegisterFuns],
+ ok.
+
+register_rabbit_exchange_projection() ->
+ Name = rabbit_khepri_exchange,
+ PathPattern = [rabbit_store,
+ exchanges,
+ _VHost = ?KHEPRI_WILDCARD_STAR,
+ _Name = ?KHEPRI_WILDCARD_STAR],
+ KeyPos = #exchange.name,
+ register_simple_projection(Name, PathPattern, KeyPos).
+
+register_rabbit_queue_projection() ->
+ Name = rabbit_khepri_queue,
+ PathPattern = [rabbit_store,
+ queues,
+ _VHost = ?KHEPRI_WILDCARD_STAR,
+ _Name = ?KHEPRI_WILDCARD_STAR],
+ KeyPos = 2, %% #amqqueue.name
+ register_simple_projection(Name, PathPattern, KeyPos).
+
+register_rabbit_vhost_projection() ->
+ Name = rabbit_khepri_vhost,
+ PathPattern = [rabbit_vhost, _VHost = ?KHEPRI_WILDCARD_STAR],
+ KeyPos = 2, %% #vhost.virtual_host
+ register_simple_projection(Name, PathPattern, KeyPos).
+
+register_rabbit_users_projection() ->
+ Name = rabbit_khepri_users,
+ PathPattern = [rabbit_auth_backend_internal,
+ users,
+ _UserName = ?KHEPRI_WILDCARD_STAR],
+ KeyPos = 2, %% #internal_user.username
+ register_simple_projection(Name, PathPattern, KeyPos).
+
+register_rabbit_user_permissions_projection() ->
+ Name = rabbit_khepri_user_permissions,
+ PathPattern = [rabbit_auth_backend_internal,
+ users,
+ _UserName = ?KHEPRI_WILDCARD_STAR,
+ user_permissions,
+ _VHost = ?KHEPRI_WILDCARD_STAR],
+ KeyPos = #user_permission.user_vhost,
+ register_simple_projection(Name, PathPattern, KeyPos).
+
+register_simple_projection(Name, PathPattern, KeyPos) ->
+ Options = #{keypos => KeyPos},
+ Fun = fun(_Path, Resource) -> Resource end,
+ Projection = khepri_projection:new(Name, Fun, Options),
+ khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection).
+
+register_rabbit_bindings_projection() ->
+ MapFun = fun(Path, Destination) ->
+ [rabbit_store, routing, VHost, ExchangeName, RoutingKey] =
+ Path,
+ Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
+ #binding{source = Exchange,
+ key = RoutingKey,
+ destination = Destination}
+ end,
+ ProjectionFun = projection_fun_for_sets(MapFun),
+ Options = #{type => bag, keypos => #binding.source},
+ Projection = khepri_projection:new(
+ rabbit_khepri_bindings, ProjectionFun, Options),
+ PathPattern = [rabbit_store,
+ routing,
+ _VHost = ?KHEPRI_WILDCARD_STAR,
+ _ExchangeName = ?KHEPRI_WILDCARD_STAR,
+ _RoutingKey = ?KHEPRI_WILDCARD_STAR],
+ khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection).
+
+register_rabbit_index_route_projection() ->
+ MapFun = fun(Path, Destination) ->
+ [rabbit_store, routing, VHost, ExchangeName, RoutingKey] =
+ Path,
+ Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
+ SourceKey = {Exchange, RoutingKey},
+ #index_route{source_key = SourceKey,
+ destination = Destination}
+ end,
+ ProjectionFun = projection_fun_for_sets(MapFun),
+ Options = #{type => bag, keypos => #index_route.source_key},
+ Projection = khepri_projection:new(
+ rabbit_khepri_index_route, ProjectionFun, Options),
+ PathPattern = [rabbit_store,
+ routing,
+ ?KHEPRI_WILDCARD_STAR,
+ ?KHEPRI_WILDCARD_STAR,
+ ?KHEPRI_WILDCARD_STAR],
+ khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection).
+
+%% Routing information is stored in the Khepri store as a `set'.
+%% In order to turn these bindings into records in an ETS `bag', we use a
+%% `khepri_projection:extended_projection_fun()' to determine the changes
+%% `khepri_projection' should apply to the ETS table using set algebra.
+projection_fun_for_sets(MapFun) ->
+ fun (_Table, _Path, undefined, undefined) ->
+ ok;
+ (Table, Path, undefined, NewPayload) ->
+ ets:insert(Table, [MapFun(Path, Element) ||
+ Element <- sets:to_list(NewPayload)]);
+
+ (Table, Path, OldPayload, undefined) ->
+ sets:fold(
+ fun(Element, _Acc) ->
+ ets:delete_object(Table, MapFun(Path, Element))
+ end, [], OldPayload);
+ (Table, Path, OldPayload, NewPayload) ->
+ Deletions = sets:subtract(OldPayload, NewPayload),
+ Creations = sets:subtract(NewPayload, OldPayload),
+ sets:fold(
+ fun(Element, _Acc) ->
+ ets:delete_object(Table, MapFun(Path, Element))
+ end, [], Deletions),
+ ets:insert(Table, [MapFun(Path, Element) ||
+ Element <- sets:to_list(Creations)])
+ end.
+
+register_rabbit_topic_graph_projection() ->
+ Name = rabbit_khepri_topic_trie,
+ Options = #{keypos => #topic_trie_edge.trie_edge},
+ Fun = fun project_topic_trie_binding/4,
+ Projection = khepri_projection:new(Name, Fun, Options),
+ PathPattern = [rabbit_store,
+ topic_trie_binding,
+ _VHost = ?KHEPRI_WILDCARD_STAR,
+ _ExchangeName = ?KHEPRI_WILDCARD_STAR,
+ _Routes = ?KHEPRI_WILDCARD_STAR_STAR],
+ khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection),
+ ok.
+
+project_topic_trie_binding(_Table, _Path, undefined, undefined) ->
+ ok;
+project_topic_trie_binding(Table, Path, undefined, NewBindings) ->
+ Edges = edges_for_path(Path, NewBindings),
+ ets:insert(Table, Edges);
+project_topic_trie_binding(Table, Path, OldBindings, undefined) ->
+ %% Delete the edge to the bindings and all edges for which the `ToNode'
+ %% has no outgoing edges.
+ [BindingEdge | RestEdges] = edges_for_path(Path, OldBindings),
+ ets:delete_object(Table, BindingEdge),
+ trim_while_out_degree_is_zero(RestEdges);
+project_topic_trie_binding(Table, Path, _OldBindings, NewBindings) ->
+ [BindingEdge | _RestEdges] = edges_for_path(Path, NewBindings),
+ ets:insert(Table, BindingEdge).
+
+edges_for_path(
+ [rabbit_store, topic_trie_binding, VHost, ExchangeName | Components],
+ Bindings) ->
+ Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
+ edges_for_path([root | Components], Bindings, Exchange, []).
+
+edges_for_path([FromNodeId, To | Rest], Bindings, Exchange, Edges) ->
+ ToNodeId = [To | FromNodeId],
+ Edge = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange,
+ node_id = FromNodeId,
+ word = To},
+ node_id = ToNodeId},
+ edges_for_path([ToNodeId | Rest], Bindings, Exchange, [Edge | Edges]);
+edges_for_path([LeafId], Bindings, Exchange, Edges) ->
+ ToNodeId = {bindings, Bindings},
+ Edge = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange,
+ node_id = LeafId,
+ word = bindings},
+ node_id = ToNodeId},
+ [Edge | Edges].
+
+-spec trim_while_out_degree_is_zero(Edges) -> ok
+ when
+ Edges :: [Edge],
+ Edge :: #topic_trie_edge{}.
+
+trim_while_out_degree_is_zero([]) ->
+ ok;
+trim_while_out_degree_is_zero([Edge | Rest]) ->
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = Exchange,
+ node_id = _FromNodeId},
+ node_id = ToNodeId} = Edge,
+ OutEdgePattern = #topic_trie_edge{trie_edge =
+ #trie_edge{exchange_name = Exchange,
+ node_id = ToNodeId,
+ word = '_'},
+ node_id = '_'},
+ case ets:match(rabbit_khepri_topic_trie, OutEdgePattern, 1) of
+ '$end_of_table' ->
+ %% If the ToNode has an out degree of zero, trim the edge to
+ %% the node, effectively erasing ToNode.
+ ets:delete_object(rabbit_khepri_topic_trie, Edge),
+ trim_while_out_degree_is_zero(Rest);
+ {_Match, _Continuation} ->
+ %% Return after finding the first node with a non-zero out-degree.
+ %% If a node has a non-zero out-degree then all of its ancestors
+ %% must as well.
+ ok
+ end.
diff --git a/deps/rabbit/src/rabbit_store.erl b/deps/rabbit/src/rabbit_store.erl
index 974671a0f9..a3f0bc0eff 100644
--- a/deps/rabbit/src/rabbit_store.erl
+++ b/deps/rabbit/src/rabbit_store.erl
@@ -241,12 +241,12 @@ match_exchanges(Pattern0) ->
lookup_exchange(Name) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> lookup({rabbit_exchange, Name}, mnesia) end,
- fun() -> lookup(khepri_exchange_path(Name), khepri) end).
+ fun() -> lookup_resource_in_khepri_projection(rabbit_khepri_exchange, Name) end).
lookup_many_exchanges(Names) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> lookup_many(rabbit_exchange, Names, mnesia) end,
- fun() -> lookup_many(fun khepri_exchange_path/1, Names, khepri) end).
+ fun() -> lookup_resources_in_khepri_projection(rabbit_khepri_exchange, Names) end).
peek_exchange_serial(XName, LockType) ->
rabbit_khepri:try_mnesia_or_khepri(
@@ -489,39 +489,34 @@ list_bindings(VHost) ->
[B || #route{binding = B} <- list_in_mnesia(rabbit_route, Match)]
end,
fun() ->
- Path = khepri_routes_path() ++ [VHost, if_has_data_wildcard()],
- lists:foldl(fun(SetOfBindings, Acc) ->
- sets:to_list(SetOfBindings) ++ Acc
- end, [], list_in_khepri(Path))
+ VHostResource = rabbit_misc:r(VHost, '_'),
+ Match = #binding{source = VHostResource,
+ destination = VHostResource,
+ _ = '_'},
+ ets:match(rabbit_khepri_bindings, Match)
end).
-list_bindings_for_source(#resource{virtual_host = VHost, name = Name} = Resource) ->
+list_bindings_for_source(Source) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
- Route = #route{binding = #binding{source = Resource, _ = '_'}},
+ Route = #route{binding = #binding{source = Source, _ = '_'}},
[B || #route{binding = B} <- list_in_mnesia(rabbit_route, Route)]
end,
- fun() ->
- Path = khepri_routes_path() ++ [VHost, Name, if_has_data_wildcard()],
- lists:foldl(fun(SetOfBindings, Acc) ->
- sets:to_list(SetOfBindings) ++ Acc
- end, [], list_in_khepri(Path))
- end).
+ fun() -> ets:lookup(rabbit_khepri_bindings, Source) end).
-list_bindings_for_destination(#resource{virtual_host = VHost, name = Name,
- kind = Kind} = Resource) ->
+list_bindings_for_destination(Destination) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
- Route = rabbit_binding:reverse_route(#route{binding = #binding{destination = Resource,
+ Route = rabbit_binding:reverse_route(#route{binding = #binding{destination = Destination,
_ = '_'}}),
[rabbit_binding:reverse_binding(B) ||
#reverse_route{reverse_binding = B} <- list_in_mnesia(rabbit_reverse_route, Route)]
end,
fun() ->
- Path = khepri_routes_path() ++ [VHost, ?KHEPRI_WILDCARD_STAR, Kind, Name, if_has_data_wildcard()],
- lists:foldl(fun(SetOfBindings, Acc) ->
- sets:to_list(SetOfBindings) ++ Acc
- end, [], list_in_khepri(Path))
+ %% TODO: projection for bindings indexed by destination?
+ Match = #binding{destination = Destination,
+ _ = '_'},
+ ets:match(rabbit_khepri_bindings, Match)
end).
list_bindings_for_source_and_destination(SrcName, DstName) ->
@@ -533,10 +528,10 @@ list_bindings_for_source_and_destination(SrcName, DstName) ->
[B || #route{binding = B} <- list_in_mnesia(rabbit_route, Route)]
end,
fun() ->
- Values = match_source_and_destination_in_khepri(SrcName, DstName),
- lists:foldl(fun(SetOfBindings, Acc) ->
- sets:to_list(SetOfBindings) ++ Acc
- end, [], Values)
+ Match = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'},
+ ets:match(rabbit_khepri_bindings, Match)
end).
list_explicit_bindings() ->
@@ -728,12 +723,12 @@ store_durable_queue(Q) ->
lookup_queue(Name) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> lookup({rabbit_queue, Name}, mnesia) end,
- fun() -> lookup(khepri_queue_path(Name), khepri) end).
+ fun() -> lookup_resource_in_khepri_projection(rabbit_khepri_queue, Name) end).
lookup_durable_queue(Name) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> lookup({rabbit_durable_queue, Name}, mnesia) end,
- fun() -> lookup(khepri_queue_path(Name), khepri) end).
+ fun() -> lookup_resource_in_khepri_projection(rabbit_khepri_queue, Name) end).
%% TODO this should be internal, it's here because of mirrored queues
lookup_queue_in_khepri_tx(Name) ->
@@ -742,14 +737,14 @@ lookup_queue_in_khepri_tx(Name) ->
lookup_queues(Names) when is_list(Names) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> lookup_many(rabbit_queue, Names, mnesia) end,
- fun() -> lookup_many(fun khepri_queue_path/1, Names, khepri) end);
+ fun() -> lookup_resources_in_khepri_projection(rabbit_khepri_queue, Names) end);
lookup_queues(Name) ->
lookup_queue(Name).
lookup_durable_queues(Names) when is_list(Names) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> lookup_many(rabbit_durable_queue, Names, mnesia) end,
- fun() -> lookup_many(fun khepri_queue_path/1, Names, khepri) end);
+ fun() -> lookup_resources_in_khepri_projection(rabbit_khepri_queue, Names) end);
lookup_durable_queues(Name) ->
lookup_durable_queue(Name).
@@ -759,7 +754,7 @@ exists_queue(Name) ->
ets:member(rabbit_queue, Name)
end,
fun() ->
- rabbit_khepri:exists(khepri_queue_path(Name))
+ ets:member(rabbit_khepri_queue, Name)
end).
update_queue(#resource{virtual_host = VHost, name = Name} = QName, Fun) ->
@@ -894,11 +889,9 @@ match_bindings(SrcName, Match) ->
Routes, Match(Binding)]
end,
fun() ->
- Data = match_source_in_khepri(SrcName),
- Bindings = lists:foldl(fun(SetOfBindings, Acc) ->
- sets:to_list(SetOfBindings) ++ Acc
- end, [], maps:values(Data)),
- [Dest || Binding = #binding{destination = Dest} <- Bindings, Match(Binding)]
+ Routes = ets:lookup(rabbit_khepri_bindings, SrcName),
+ [Dest || Binding = #binding{destination = Dest} <-
+ Routes, Match(Binding)]
end).
match_routing_key(SrcName, RoutingKeys, UseIndex) ->
@@ -906,13 +899,18 @@ match_routing_key(SrcName, RoutingKeys, UseIndex) ->
fun() ->
case UseIndex of
true ->
- route_in_mnesia_v2(SrcName, RoutingKeys);
+ route_v2(rabbit_index_route, SrcName, RoutingKeys);
_ ->
route_in_mnesia_v1(SrcName, RoutingKeys)
end
end,
fun() ->
- match_source_and_key_in_khepri(SrcName, RoutingKeys)
+ case UseIndex of
+ true ->
+ route_v2(rabbit_khepri_index_route, SrcName, RoutingKeys);
+ _ ->
+ match_source_and_key_in_khepri(SrcName, RoutingKeys)
+ end
end).
route_in_mnesia_v1(SrcName, [RoutingKey]) ->
@@ -945,27 +943,27 @@ route_in_mnesia_v1(SrcName, [_|_] = RoutingKeys) ->
%% ets:select/2 is expensive because it needs to compile the match spec every
%% time and lookup does not happen by a hash key.
%%
-%% In contrast, route_v2/2 increases end-to-end message sending throughput
+%% In contrast, route_v2/3 increases end-to-end message sending throughput
%% (i.e. from RabbitMQ client to the queue process) by up to 35% by using ets:lookup_element/3.
%% Only the direct exchange type uses the rabbit_index_route table to store its
%% bindings by table key tuple {SourceExchange, RoutingKey}.
--spec route_in_mnesia_v2(rabbit_types:binding_source(), [rabbit_router:routing_key(), ...]) ->
+-spec route_v2(ets:table(), rabbit_types:binding_source(), [rabbit_router:routing_key(), ...]) ->
rabbit_router:match_result().
-route_in_mnesia_v2(SrcName, [RoutingKey]) ->
+route_v2(Table, SrcName, [RoutingKey]) ->
%% optimization
- destinations(SrcName, RoutingKey);
-route_in_mnesia_v2(SrcName, [_|_] = RoutingKeys) ->
+ destinations(Table, SrcName, RoutingKey);
+route_v2(Table, SrcName, [_|_] = RoutingKeys) ->
lists:flatmap(fun(Key) ->
- destinations(SrcName, Key)
+ destinations(Table, SrcName, Key)
end, RoutingKeys).
-destinations(SrcName, RoutingKey) ->
+destinations(Table, SrcName, RoutingKey) ->
%% Prefer try-catch block over checking Key existence with ets:member/2.
%% The latter reduces throughput by a few thousand messages per second because
%% of function db_member_hash in file erl_db_hash.c.
%% We optimise for the happy path, that is the binding / table key is present.
try
- ets:lookup_element(rabbit_index_route,
+ ets:lookup_element(Table,
{SrcName, RoutingKey},
#index_route.destination)
catch
@@ -1003,26 +1001,8 @@ add_topic_trie_binding_tx(XName, RoutingKey, Destination, Args) ->
ok = khepri_tx:put(Path, Set).
route_delivery_for_exchange_type_topic(XName, RoutingKey) ->
- Words = lists:map(fun(W) -> #if_any{conditions = [W, <<"*">>]} end,
- split_topic_trie_key(RoutingKey)),
- Root = khepri_exchange_type_topic_path(XName),
- Path0 = Root ++ Words,
- {Hd, [Tl]} = lists:split(length(Path0) - 1, Path0),
- Path = Hd ++ [if_has_data([Tl])],
- Fanout = Root ++ [<<"#">>],
- Map = case rabbit_khepri:get(Fanout) of
- {ok, Data} when Data =/= undefined ->
- #{Fanout => Data};
- _ ->
- case rabbit_khepri:match(Path) of
- {ok, Map0} -> Map0;
- _ -> #{}
- end
- end,
- maps:fold(fun(_, Data, Acc) ->
- Bindings = sets:to_list(Data),
- [maps:get(destination, B) || B <- Bindings] ++ Acc
- end, [], Map).
+ Words = split_topic_trie_key(RoutingKey),
+ trie_match(XName, Words).
delete_topic_trie_bindings_for_exchange(XName) ->
ok = rabbit_khepri:delete(khepri_exchange_type_topic_path(XName)).
@@ -1932,22 +1912,15 @@ binding_has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -
Error
end.
-match_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
- Path = khepri_routes_path() ++ [VHost, Name, if_has_data_wildcard()],
- {ok, Map} = rabbit_khepri:match(Path),
- Map.
-
match_source_and_key_in_khepri(Src, ['_']) ->
- Path = khepri_routing_path(Src, if_has_data_wildcard()),
- case rabbit_khepri:match(Path) of
- {ok, Map} ->
- maps:fold(fun(_, Dsts, Acc) ->
- sets:to_list(Dsts) ++ Acc
- end, [], Map);
- {error, {khepri, node_not_found, _}} ->
+ try
+ ets:lookup_element(rabbit_khepri_bindings, Src, #binding.destination)
+ catch
+ error:badarg ->
[]
end;
match_source_and_key_in_khepri(Src, RoutingKeys) ->
+ %% TODO: use {@link ets:select/2}
lists:foldl(
fun(RK, Acc) ->
Path = khepri_routing_path(Src, RK),
@@ -2302,3 +2275,63 @@ recover_mnesia_tables() ->
++ [Table || {Table, _} <- rabbit_table:definitions()],
[mnesia:change_table_access_mode(Table, read_write) || Table <- Tables],
ok.
+
+lookup_resource_in_khepri_projection(Projection, Key) ->
+ case ets:lookup(Projection, Key) of
+ [Resource] -> {ok, Resource};
+ [] -> {error, not_found}
+ end.
+
+lookup_resources_in_khepri_projection(Projection, Keys) ->
+ lists:append([ets:lookup(Projection, Name) || Name <- Keys]).
+
+%%--->8--- Khepri Topic Exchange Routing --->8---
+%% This is slightly different than the mnesia version. We use only one
+%% table. Bindings are the leaf nodes of the trie.
+
+%% Also, lists are rewritten as binaries. Why does this work before?
+
+trie_match(X, Words) ->
+ trie_match(X, root, Words, []).
+
+trie_match(X, Node, [], ResAcc) ->
+ trie_match_part(X, Node, <<"#">>, fun trie_match_skip_any/4, [],
+ trie_bindings(X, Node) ++ ResAcc);
+trie_match(X, Node, [W | RestW] = Words, ResAcc) ->
+ lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
+ trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc)
+ end, ResAcc, [{W, fun trie_match/4, RestW},
+ {<<"*">>, fun trie_match/4, RestW},
+ {<<"#">>, fun trie_match_skip_any/4, Words}]).
+
+trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) ->
+ case trie_child(X, Node, Search) of
+ {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
+ error -> ResAcc
+ end.
+
+trie_match_skip_any(X, Node, [], ResAcc) ->
+ trie_match(X, Node, [], ResAcc);
+trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) ->
+ trie_match_skip_any(X, Node, RestW,
+ trie_match(X, Node, Words, ResAcc)).
+
+trie_child(X, Node, Word) ->
+ case ets:lookup(rabbit_khepri_topic_trie,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}) of
+ [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
+ [] -> error
+ end.
+
+trie_bindings(X, Node) ->
+ case ets:lookup(rabbit_khepri_topic_trie,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = bindings}) of
+ [#topic_trie_edge{node_id = {bindings, Bindings}}] ->
+ [Dest || #{destination := Dest} <- sets:to_list(Bindings)];
+ [] ->
+ []
+ end.
diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl
index 4fd7a76286..3d464f22be 100644
--- a/deps/rabbit/src/rabbit_vhost.erl
+++ b/deps/rabbit/src/rabbit_vhost.erl
@@ -580,8 +580,7 @@ exists_in_mnesia(VHost) ->
mnesia:dirty_read({rabbit_vhost, VHost}) /= [].
exists_in_khepri(VHost) ->
- Path = khepri_vhost_path(VHost),
- rabbit_khepri:exists(Path).
+ ets:member(rabbit_khepri_vhost, VHost).
-spec list_names() -> [vhost:name()].
list_names() ->
@@ -593,11 +592,7 @@ list_names_in_mnesia() ->
mnesia:dirty_all_keys(rabbit_vhost).
list_names_in_khepri() ->
- Path = khepri_vhosts_path(),
- case rabbit_khepri:list_child_nodes(Path) of
- {ok, Result} -> Result;
- _ -> []
- end.
+ ets:select(rabbit_khepri_vhost, [{vhost:pattern_match_names(), [], ['$1']}]).
%% Exists for backwards compatibility, prefer list_names/0.
-spec list() -> [vhost:name()].
@@ -613,11 +608,7 @@ all_in_mnesia() ->
mnesia:dirty_match_object(rabbit_vhost, vhost:pattern_match_all()).
all_in_khepri() ->
- Path = khepri_vhosts_path(),
- case rabbit_khepri:list(Path) of
- {ok, VHosts} -> maps:values(VHosts);
- _ -> []
- end.
+ ets:tab2list(rabbit_khepri_vhost).
-spec all_tagged_with(atom()) -> [vhost:vhost()].
all_tagged_with(TagName) ->
@@ -655,10 +646,9 @@ lookup_in_mnesia(VHostName) ->
end.
lookup_in_khepri(VHostName) ->
- Path = khepri_vhost_path(VHostName),
- case rabbit_khepri:get(Path) of
- {ok, Record} -> Record;
- _ -> {error, {no_such_vhost, VHostName}}
+ case ets:lookup(rabbit_khepri_vhost, VHostName) of
+ [Record] -> Record;
+ _ -> {error, {no_such_vhost, VHostName}}
end.
with_in_mnesia(VHostName, Thunk) ->
@@ -869,10 +859,9 @@ info_in_mnesia(Key) ->
end.
info_in_khepri(Key) ->
- Path = khepri_vhost_path(Key),
- case rabbit_khepri:get(Path) of
- {ok, VHost} -> infos(?INFO_KEYS, VHost);
- _ -> []
+ case ets:lookup(rabbit_khepri_vhost, Key) of
+ [VHost] -> infos(?INFO_KEYS, VHost);
+ _ -> []
end.
-spec info(vhost:vhost(), rabbit_types:info_keys()) -> rabbit_types:infos().
diff --git a/deps/rabbit/src/vhost.erl b/deps/rabbit/src/vhost.erl
index cdd3ba3ce5..fb0cb99c6d 100644
--- a/deps/rabbit/src/vhost.erl
+++ b/deps/rabbit/src/vhost.erl
@@ -20,6 +20,7 @@
upgrade/1,
upgrade_to/2,
pattern_match_all/0,
+ pattern_match_names/0,
get_name/1,
get_limits/1,
get_metadata/1,
@@ -46,7 +47,7 @@
-record(vhost, {
%% name as a binary
- virtual_host :: name() | '_',
+ virtual_host :: name() | '_' | '$1',
%% proplist of limits configured, if any
limits :: list() | '_',
metadata :: metadata() | '_'
@@ -60,7 +61,7 @@
-type vhost_pattern() :: vhost_v2_pattern().
-type vhost_v2_pattern() :: #vhost{
- virtual_host :: name() | '_',
+ virtual_host :: name() | '_' | '$1',
limits :: '_',
metadata :: '_'
}.
@@ -112,10 +113,13 @@ info_keys() ->
cluster_state].
-spec pattern_match_all() -> vhost_pattern().
-
pattern_match_all() ->
#vhost{_ = '_'}.
+-spec pattern_match_names() -> vhost_pattern().
+pattern_match_names() ->
+ #vhost{virtual_host = '$1', _ = '_'}.
+
-spec get_name(vhost()) -> name().
get_name(#vhost{virtual_host = Value}) -> Value.
diff --git a/workspace_helpers.bzl b/workspace_helpers.bzl
index 625b10ed8e..45b6e38932 100644
--- a/workspace_helpers.bzl
+++ b/workspace_helpers.bzl
@@ -125,8 +125,8 @@ def rabbitmq_external_deps(rabbitmq_workspace = "@rabbitmq-server"):
github_erlang_app(
name = "khepri",
org = "rabbitmq",
- ref = "main",
- version = "main",
+ ref = "b7f9bac94857078a0b0584aa72a920f7596a1e49",
+ version = "b7f9bac94857078a0b0584aa72a920f7596a1e49",
build_file = rabbitmq_workspace + "//:BUILD.khepri",
)
@@ -180,11 +180,16 @@ sed -i"_orig" -E '/VERSION/ s/[0-9]+\\.[0-9]+\\.[0-9]+/'${VERSION}'/' BUILD.baze
sha256 = "282a8a323ca2a845c9e6f787d166348f776c1d4a41ede63046d72d422e3da946",
)
- git_repository(
+ hex_pm_erlang_app(
name = "ra",
- branch = "main",
- remote = "https://github.com/rabbitmq/ra.git",
- patch_cmds = [RA_INJECT_GIT_VERSION],
+ version = "2.4.0",
+ sha256 = "f716146ee1755823fbff741669053efc76e241caa95465782fc4c0a4389eb40c",
+ deps = [
+ "@gen_batch_server//:erlang_app",
+ ],
+ runtime_deps = [
+ "@aten//:erlang_app",
+ ]
)
hex_archive(