diff options
-rw-r--r-- | deps/rabbit/src/rabbit.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_core_ff.erl | 6 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_db.erl (renamed from deps/rabbit/src/rabbit_store.erl) | 5 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_db_binding.erl | 38 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_db_exchange.erl | 22 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_db_msup.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_db_queue.erl | 32 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_db_topic_exchange.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_mirror_queue_misc.erl | 2 |
9 files changed, 56 insertions, 59 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index e1ab768d0f..b217a89228 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -64,13 +64,13 @@ {enables, external_infrastructure}]}). -rabbit_boot_step({database, - [{mfa, {rabbit_store, init, []}}, + [{mfa, {rabbit_db, init, []}}, {requires, file_handle_cache}, {enables, external_infrastructure}]}). -rabbit_boot_step({database_sync, [{description, "database sync"}, - {mfa, {rabbit_store, sync, []}}, + {mfa, {rabbit_db, sync, []}}, {requires, database}, {enables, external_infrastructure}]}). diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 8c484f7cd2..e003e53f3e 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -149,7 +149,7 @@ Args :: rabbit_feature_flags:enable_callback_args(), Ret :: rabbit_feature_flags:enable_callback_ret(). direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) -> - case rabbit_store:is_migration_done(raft_based_metadata_store_phase1) of + case rabbit_db:is_migration_done(raft_based_metadata_store_phase1) of true -> %% The routing shortcut should be disabled after enabling Khepri ok; @@ -316,7 +316,7 @@ mds_phase1_migration_post_enable(#{feature_name := FeatureName}) -> %% data from previous migrations when a node joins an existing cluster. %% If the migration has never been completed, then it's fair to remove %% the existing data in Khepri. - rabbit_store:set_migration_flag(FeatureName). + rabbit_db:set_migration_flag(FeatureName). mds_migration_enable(FeatureName, TablesAndOwners) -> case ensure_khepri_cluster_matches_mnesia(FeatureName) of @@ -502,7 +502,7 @@ migrate_tables_to_khepri_run(FeatureName, TablesAndOwners) -> "Feature flag `~s`: clear data from any aborted migration attempts " "(if any)", [FeatureName]), - case rabbit_store:is_migration_done(FeatureName) of + case rabbit_db:is_migration_done(FeatureName) of %% This flag is necessary to skip clearing %% data from previous migrations when a node joins an existing cluster. %% If the migration has never been completed, then it's fair to remove diff --git a/deps/rabbit/src/rabbit_store.erl b/deps/rabbit/src/rabbit_db.erl index 1abcd60eb2..e848ed4650 100644 --- a/deps/rabbit/src/rabbit_store.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -5,12 +5,9 @@ %% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. %% --module(rabbit_store). +-module(rabbit_db). -include_lib("khepri/include/khepri.hrl"). --include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit_common/include/rabbit_framing.hrl"). --include("amqqueue.hrl"). -export([init/0, sync/0]). -export([set_migration_flag/1, is_migration_done/1]). diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index d7e2053fd1..f746499b93 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -151,13 +151,13 @@ get_all(VHost) -> destination = VHostResource, _ = '_'}, _ = '_'}, - [B || #route{binding = B} <- rabbit_store:list_in_mnesia(rabbit_route, Match)] + [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Match)] end, fun() -> - Path = khepri_routes_path() ++ [VHost, rabbit_store:if_has_data_wildcard()], + Path = khepri_routes_path() ++ [VHost, rabbit_db:if_has_data_wildcard()], lists:foldl(fun(SetOfBindings, Acc) -> sets:to_list(SetOfBindings) ++ Acc - end, [], rabbit_store:list_in_khepri(Path)) + end, [], rabbit_db:list_in_khepri(Path)) end). -spec get_all_for_source(Src) -> [Binding] when @@ -173,14 +173,14 @@ get_all_for_source(#resource{virtual_host = VHost, name = Name} = Resource) -> rabbit_khepri:try_mnesia_or_khepri( fun() -> Route = #route{binding = #binding{source = Resource, _ = '_'}}, - [B || #route{binding = B} <- rabbit_store:list_in_mnesia(rabbit_route, Route)] + [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Route)] end, fun() -> Path = khepri_routes_path() ++ [VHost, Name, - rabbit_store:if_has_data_wildcard()], + rabbit_db:if_has_data_wildcard()], lists:foldl(fun(SetOfBindings, Acc) -> sets:to_list(SetOfBindings) ++ Acc - end, [], rabbit_store:list_in_khepri(Path)) + end, [], rabbit_db:list_in_khepri(Path)) end). -spec get_all_for_destination(Dst) -> [Binding] when @@ -200,14 +200,14 @@ get_all_for_destination(#resource{virtual_host = VHost, name = Name, Route = rabbit_binding:reverse_route(#route{binding = #binding{destination = Resource, _ = '_'}}), [rabbit_binding:reverse_binding(B) || - #reverse_route{reverse_binding = B} <- rabbit_store:list_in_mnesia(rabbit_reverse_route, Route)] + #reverse_route{reverse_binding = B} <- rabbit_db:list_in_mnesia(rabbit_reverse_route, Route)] end, fun() -> Path = khepri_routes_path() ++ [VHost, ?KHEPRI_WILDCARD_STAR, Kind, Name, - rabbit_store:if_has_data_wildcard()], + rabbit_db:if_has_data_wildcard()], lists:foldl(fun(SetOfBindings, Acc) -> sets:to_list(SetOfBindings) ++ Acc - end, [], rabbit_store:list_in_khepri(Path)) + end, [], rabbit_db:list_in_khepri(Path)) end). -spec get_all(Src, Dst) -> [Binding] when @@ -227,7 +227,7 @@ get_all(SrcName, DstName) -> Route = #route{binding = #binding{source = SrcName, destination = DstName, _ = '_'}}, - [B || #route{binding = B} <- rabbit_store:list_in_mnesia(rabbit_route, Route)] + [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Route)] end, fun() -> Values = get_all_in_khepri(SrcName, DstName), @@ -257,7 +257,7 @@ get_all_explicit() -> fun() -> Condition = #if_not{condition = #if_name_matches{regex = "^$"}}, Path = khepri_routes_path() ++ [?KHEPRI_WILDCARD_STAR, Condition, - rabbit_store:if_has_data_wildcard()], + rabbit_db:if_has_data_wildcard()], {ok, Data} = rabbit_khepri:match(Path), lists:foldl(fun(SetOfBindings, Acc) -> sets:to_list(SetOfBindings) ++ Acc @@ -285,7 +285,7 @@ fold(Fun, Acc) -> end, fun() -> Path = khepri_routes_path() ++ [?KHEPRI_WILDCARD_STAR, - rabbit_store:if_has_data_wildcard()], + rabbit_db:if_has_data_wildcard()], {ok, Res} = rabbit_khepri:fold( Path, fun(_, #{data := SetOfBindings}, Acc0) -> @@ -408,7 +408,7 @@ has_for_source_in_mnesia(SrcName) -> contains(rabbit_semi_durable_route, Match). has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> - Path = khepri_routes_path() ++ [VHost, Name, rabbit_store:if_has_data_wildcard()], + Path = khepri_routes_path() ++ [VHost, Name, rabbit_db:if_has_data_wildcard()], case khepri_tx:get_many(Path) of {ok, Map} -> maps:size(Map) > 0; @@ -418,7 +418,7 @@ has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> match_source_and_destination_in_khepri_tx(#resource{virtual_host = VHost, name = Name}, #resource{kind = Kind, name = DstName}) -> - Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName, rabbit_store:if_has_data_wildcard()], + Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName, rabbit_db:if_has_data_wildcard()], case khepri_tx:get_many(Path) of {ok, Map} -> maps:values(Map); _ -> [] @@ -511,7 +511,7 @@ lookup_resource_in_khepri_tx(#resource{kind = exchange} = Name) -> rabbit_db_exchange:get_in_khepri_tx(Name). match_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> - Path = khepri_routes_path() ++ [VHost, Name, rabbit_store:if_has_data_wildcard()], + Path = khepri_routes_path() ++ [VHost, Name, rabbit_db:if_has_data_wildcard()], {ok, Map} = rabbit_khepri:match(Path), Map. @@ -521,7 +521,7 @@ match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Map. match_source_and_key_in_khepri(Src, ['_']) -> - Path = khepri_routing_path(Src, rabbit_store:if_has_data_wildcard()), + Path = khepri_routing_path(Src, rabbit_db:if_has_data_wildcard()), case rabbit_khepri:match(Path) of {ok, Map} -> maps:fold(fun(_, Dsts, Acc) -> @@ -647,8 +647,8 @@ populate_index_route_table_in_mnesia() -> get_all_in_khepri(#resource{virtual_host = VHost, name = Name}, #resource{kind = Kind, name = DstName}) -> Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName, - rabbit_store:if_has_data_wildcard()], - rabbit_store:list_in_khepri(Path). + rabbit_db:if_has_data_wildcard()], + rabbit_db:list_in_khepri(Path). delete_in_mnesia(Binding, ChecksFun) -> binding_action_in_mnesia( @@ -764,7 +764,7 @@ delete_for_source_in_mnesia(SrcName, ShouldIndexTable) -> delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> Path = khepri_routes_path() ++ [VHost, Name], - {ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_store:if_has_data_wildcard()]), + {ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_db:if_has_data_wildcard()]), ok = khepri_tx:delete(Path), maps:fold(fun(_, Set, Acc) -> sets:to_list(Set) ++ Acc diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index ae150a3dd0..188be15e38 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -65,10 +65,10 @@ get_all() -> rabbit_khepri:try_mnesia_or_khepri( fun() -> - rabbit_store:list_in_mnesia(rabbit_exchange, #exchange{_ = '_'}) + rabbit_db:list_in_mnesia(rabbit_exchange, #exchange{_ = '_'}) end, fun() -> - rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [rabbit_store:if_has_data_wildcard()]) + rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [rabbit_db:if_has_data_wildcard()]) end). -spec get_all(VHostName) -> [Exchange] when @@ -100,10 +100,10 @@ get_all(VHost) -> get_all_durable() -> rabbit_khepri:try_mnesia_or_khepri( fun() -> - rabbit_store:list_in_mnesia(rabbit_durable_exchange, #exchange{_ = '_'}) + rabbit_db:list_in_mnesia(rabbit_durable_exchange, #exchange{_ = '_'}) end, fun() -> - rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [rabbit_store:if_has_data_wildcard()]) + rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [rabbit_db:if_has_data_wildcard()]) end). %% ------------------------------------------------------------------- @@ -124,7 +124,7 @@ list() -> mnesia:dirty_all_keys(rabbit_exchange) end, fun() -> - case rabbit_khepri:match(khepri_exchanges_path() ++ [rabbit_store:if_has_data_wildcard()]) of + case rabbit_khepri:match(khepri_exchanges_path() ++ [rabbit_db:if_has_data_wildcard()]) of {ok, Map} -> maps:fold(fun(_K, X, Acc) -> [X#exchange.name | Acc] end, [], Map); _ -> @@ -223,7 +223,7 @@ update(#resource{virtual_host = VHost, name = Name} = XName, Fun) -> end, fun() -> Path = khepri_exchange_path(XName), - rabbit_store:retry( + rabbit_db:retry( fun () -> case rabbit_khepri:adv_get(Path) of {ok, #{data := X, payload_version := Vsn}} -> @@ -346,7 +346,7 @@ next_serial(#exchange{name = #resource{name = Name, virtual_host = VHost} = XNam fun() -> %% Just storing the serial number is enough, no need to keep #exchange_serial{} Path = khepri_exchange_serial_path(XName), - rabbit_store:retry( + rabbit_db:retry( fun() -> case rabbit_khepri:adv_get(Path) of {ok, #{data := Serial, @@ -488,7 +488,7 @@ match(Pattern0) -> fun() -> %% TODO error handling? Pattern = #if_data_matches{pattern = Pattern0}, - rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [?KHEPRI_WILDCARD_STAR, Pattern]) + rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [?KHEPRI_WILDCARD_STAR, Pattern]) end). %% ------------------------------------------------------------------- @@ -593,10 +593,10 @@ get_in_khepri_tx(Name) -> list_exchanges_in_mnesia(VHost) -> Match = #exchange{name = rabbit_misc:r(VHost, exchange), _ = '_'}, - rabbit_store:list_in_mnesia(rabbit_exchange, Match). + rabbit_db:list_in_mnesia(rabbit_exchange, Match). list_exchanges_in_khepri(VHost) -> - rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_store:if_has_data_wildcard()]). + rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_db:if_has_data_wildcard()]). peek_serial_in_mnesia(XName, LockType) -> case mnesia:read(rabbit_exchange_serial, XName, LockType) of @@ -766,7 +766,7 @@ recover_in_mnesia(VHost) -> recover_in_khepri(VHost) -> %% Transient exchanges are deprecated in Khepri, all exchanges are recovered - Exchanges0 = rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_store:if_has_data_wildcard()], + Exchanges0 = rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_db:if_has_data_wildcard()], #{timeout => infinity}), Exchanges = [rabbit_exchange_decorator:set(X) || X <- Exchanges0], diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl index 313e7b8094..60029c2cfe 100644 --- a/deps/rabbit/src/rabbit_db_msup.erl +++ b/deps/rabbit/src/rabbit_db_msup.erl @@ -173,7 +173,7 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = S = #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Overall, childspec = ChildSpec}, - rabbit_store:retry( + rabbit_db:retry( fun() -> case rabbit_khepri:adv_get(Path) of {ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid}, diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 5676198799..54b57194a9 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -53,7 +53,7 @@ %% HA queues are removed it can be deleted. -export([internal_delete/3]). -%% Used by other rabbit_db_* modules and rabbit_store +%% Used by other rabbit_db_* modules -export([ update_in_mnesia/2, update_in_khepri/2, @@ -82,12 +82,12 @@ get_all() -> rabbit_khepri:try_mnesia_or_khepri( fun() -> list_with_possible_retry_in_mnesia( fun() -> - rabbit_store:list_in_mnesia(rabbit_queue, amqqueue:pattern_match_all()) + rabbit_db:list_in_mnesia(rabbit_queue, amqqueue:pattern_match_all()) end) end, fun() -> list_with_possible_retry_in_khepri( fun() -> - rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data_wildcard()]) + rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data_wildcard()]) end) end). @@ -127,12 +127,12 @@ get_all_durable() -> rabbit_khepri:try_mnesia_or_khepri( fun() -> list_with_possible_retry_in_mnesia( fun() -> - rabbit_store:list_in_mnesia(rabbit_durable_queue, amqqueue:pattern_match_all()) + rabbit_db:list_in_mnesia(rabbit_durable_queue, amqqueue:pattern_match_all()) end) end, fun() -> list_with_possible_retry_in_khepri( fun() -> - rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data_wildcard()]) + rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data_wildcard()]) end) end). @@ -151,12 +151,12 @@ get_all_durable(VHost) -> fun() -> list_with_possible_retry_in_mnesia( fun() -> Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHost, queue)), - rabbit_store:list_in_mnesia(rabbit_durable_queue, Pattern) + rabbit_db:list_in_mnesia(rabbit_durable_queue, Pattern) end) end, fun() -> list_with_possible_retry_in_khepri( fun() -> - rabbit_store:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_store:if_has_data_wildcard()]) + rabbit_db:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_db:if_has_data_wildcard()]) end) end). @@ -164,10 +164,10 @@ get_all_durable_by_type(Type) -> Pattern = amqqueue:pattern_match_on_type(Type), rabbit_khepri:try_mnesia_or_khepri( fun() -> - rabbit_store:list_in_mnesia(rabbit_durable_queue, Pattern) + rabbit_db:list_in_mnesia(rabbit_durable_queue, Pattern) end, fun() -> - rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = amqqueue:pattern_match_all()}])]) + rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = amqqueue:pattern_match_all()}])]) end). list() -> @@ -176,7 +176,7 @@ list() -> mnesia:dirty_all_keys(rabbit_queue) end, fun() -> - case rabbit_khepri:match(khepri_queues_path() ++ [rabbit_store:if_has_data_wildcard()]) of + case rabbit_khepri:match(khepri_queues_path() ++ [rabbit_db:if_has_data_wildcard()]) of {ok, Map} -> maps:fold(fun(_K, Q, Acc) -> [amqqueue:get_name(Q) | Acc] end, [], Map); _ -> @@ -271,7 +271,7 @@ update(#resource{virtual_host = VHost, name = Name} = QName, Fun) -> end, fun() -> Path = khepri_queue_path(QName), - rabbit_store:retry( + rabbit_db:retry( fun() -> case rabbit_khepri:adv_get(Path) of {ok, #{data := Q, payload_version := Vsn}} -> @@ -312,10 +312,10 @@ get_all_by_type(Type) -> Pattern = amqqueue:pattern_match_on_type(Type), rabbit_khepri:try_mnesia_or_khepri( fun() -> - rabbit_store:list_in_mnesia(rabbit_queue, Pattern) + rabbit_db:list_in_mnesia(rabbit_queue, Pattern) end, fun() -> - rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}])]) + rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}])]) end). create_or_get(DurableQ, Q) -> @@ -506,13 +506,13 @@ get_all_in_mnesia(VHost) -> list_with_possible_retry_in_mnesia( fun() -> Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHost, queue)), - rabbit_store:list_in_mnesia(rabbit_queue, Pattern) + rabbit_db:list_in_mnesia(rabbit_queue, Pattern) end). get_all_in_khepri(VHost) -> list_with_possible_retry_in_khepri( fun() -> - rabbit_store:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_store:if_has_data_wildcard()]) + rabbit_db:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_db:if_has_data_wildcard()]) end). not_found_or_absent_queue_dirty_in_mnesia(Name) -> @@ -657,7 +657,7 @@ update_decorators_in_khepri(#resource{virtual_host = VHost, name = Name} = QName %% Decorators have just been calculated on `rabbit_queue_decorator:maybe_recover/1`, thus %% we can update them here directly. Path = khepri_queue_path(QName), - rabbit_store:retry( + rabbit_db:retry( fun() -> case rabbit_khepri:adv_get(Path) of {ok, #{data := Q0, payload_version := Vsn}} -> diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl index 88eff021bf..9a27057009 100644 --- a/deps/rabbit/src/rabbit_db_topic_exchange.erl +++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl @@ -176,7 +176,7 @@ add_topic_trie_binding_in_khepri(XName, RoutingKey, Destination, Args) -> Path = khepri_exchange_type_topic_path(XName) ++ split_topic_key_binary(RoutingKey), {Path0, [Last]} = lists:split(length(Path) - 1, Path), Binding = #{destination => Destination, arguments => Args}, - rabbit_store:retry( + rabbit_db:retry( fun() -> case rabbit_khepri:adv_get(Path) of {ok, #{data := Set0, payload_version := Vsn}} -> @@ -202,7 +202,7 @@ add_topic_trie_binding_tx(XName, RoutingKey, Destination, Args) -> ok = khepri_tx:put(Path, Set). route_delivery_for_exchange_type_topic_in_khepri(XName, RoutingKey) -> - Root = khepri_exchange_type_topic_path(XName) ++ [rabbit_store:if_has_data_wildcard()], + Root = khepri_exchange_type_topic_path(XName) ++ [rabbit_db:if_has_data_wildcard()], case rabbit_khepri:fold( Root, fun(Path0, #{data := Set}, Acc) -> diff --git a/deps/rabbit/src/rabbit_mirror_queue_misc.erl b/deps/rabbit/src/rabbit_mirror_queue_misc.erl index 8b9840bc47..b502ef6870 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_misc.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_misc.erl @@ -164,7 +164,7 @@ remove_from_queue_in_khepri(QueueName, Self, DeadGMPids) -> fun () -> %% Someone else could have deleted the queue before we %% get here. Or, gm group could've altered. see rabbitmq-server#914 - case rabbit_store:lookup_queue_in_khepri_tx(QueueName) of + case rabbit_db:get_in_khepri_tx(QueueName) of [] -> {error, not_found}; [Q0] when ?is_amqqueue(Q0) -> QPid = amqqueue:get_pid(Q0), |