summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Parra Corbacho <dparracorbac@vmware.com>2022-12-21 12:34:29 +0100
committerDiana Parra Corbacho <dparracorbac@vmware.com>2022-12-21 12:34:29 +0100
commit354e333530b7c12624ddf268736147437147195f (patch)
tree268668c5a1002ca95ffe651c54f633cb68575d9d
parentf20eede7cde46ba82f95e0bd0fc8c59a0380a193 (diff)
downloadrabbitmq-server-git-khepri-queues.tar.gz
Khepri: rename rabbit_store to rabbit_dbkhepri-queues
-rw-r--r--deps/rabbit/src/rabbit.erl4
-rw-r--r--deps/rabbit/src/rabbit_core_ff.erl6
-rw-r--r--deps/rabbit/src/rabbit_db.erl (renamed from deps/rabbit/src/rabbit_store.erl)5
-rw-r--r--deps/rabbit/src/rabbit_db_binding.erl38
-rw-r--r--deps/rabbit/src/rabbit_db_exchange.erl22
-rw-r--r--deps/rabbit/src/rabbit_db_msup.erl2
-rw-r--r--deps/rabbit/src/rabbit_db_queue.erl32
-rw-r--r--deps/rabbit/src/rabbit_db_topic_exchange.erl4
-rw-r--r--deps/rabbit/src/rabbit_mirror_queue_misc.erl2
9 files changed, 56 insertions, 59 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index e1ab768d0f..b217a89228 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -64,13 +64,13 @@
{enables, external_infrastructure}]}).
-rabbit_boot_step({database,
- [{mfa, {rabbit_store, init, []}},
+ [{mfa, {rabbit_db, init, []}},
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
-rabbit_boot_step({database_sync,
[{description, "database sync"},
- {mfa, {rabbit_store, sync, []}},
+ {mfa, {rabbit_db, sync, []}},
{requires, database},
{enables, external_infrastructure}]}).
diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl
index 8c484f7cd2..e003e53f3e 100644
--- a/deps/rabbit/src/rabbit_core_ff.erl
+++ b/deps/rabbit/src/rabbit_core_ff.erl
@@ -149,7 +149,7 @@
Args :: rabbit_feature_flags:enable_callback_args(),
Ret :: rabbit_feature_flags:enable_callback_ret().
direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
- case rabbit_store:is_migration_done(raft_based_metadata_store_phase1) of
+ case rabbit_db:is_migration_done(raft_based_metadata_store_phase1) of
true ->
%% The routing shortcut should be disabled after enabling Khepri
ok;
@@ -316,7 +316,7 @@ mds_phase1_migration_post_enable(#{feature_name := FeatureName}) ->
%% data from previous migrations when a node joins an existing cluster.
%% If the migration has never been completed, then it's fair to remove
%% the existing data in Khepri.
- rabbit_store:set_migration_flag(FeatureName).
+ rabbit_db:set_migration_flag(FeatureName).
mds_migration_enable(FeatureName, TablesAndOwners) ->
case ensure_khepri_cluster_matches_mnesia(FeatureName) of
@@ -502,7 +502,7 @@ migrate_tables_to_khepri_run(FeatureName, TablesAndOwners) ->
"Feature flag `~s`: clear data from any aborted migration attempts "
"(if any)",
[FeatureName]),
- case rabbit_store:is_migration_done(FeatureName) of
+ case rabbit_db:is_migration_done(FeatureName) of
%% This flag is necessary to skip clearing
%% data from previous migrations when a node joins an existing cluster.
%% If the migration has never been completed, then it's fair to remove
diff --git a/deps/rabbit/src/rabbit_store.erl b/deps/rabbit/src/rabbit_db.erl
index 1abcd60eb2..e848ed4650 100644
--- a/deps/rabbit/src/rabbit_store.erl
+++ b/deps/rabbit/src/rabbit_db.erl
@@ -5,12 +5,9 @@
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
--module(rabbit_store).
+-module(rabbit_db).
-include_lib("khepri/include/khepri.hrl").
--include_lib("rabbit_common/include/rabbit.hrl").
--include_lib("rabbit_common/include/rabbit_framing.hrl").
--include("amqqueue.hrl").
-export([init/0, sync/0]).
-export([set_migration_flag/1, is_migration_done/1]).
diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl
index d7e2053fd1..f746499b93 100644
--- a/deps/rabbit/src/rabbit_db_binding.erl
+++ b/deps/rabbit/src/rabbit_db_binding.erl
@@ -151,13 +151,13 @@ get_all(VHost) ->
destination = VHostResource,
_ = '_'},
_ = '_'},
- [B || #route{binding = B} <- rabbit_store:list_in_mnesia(rabbit_route, Match)]
+ [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Match)]
end,
fun() ->
- Path = khepri_routes_path() ++ [VHost, rabbit_store:if_has_data_wildcard()],
+ Path = khepri_routes_path() ++ [VHost, rabbit_db:if_has_data_wildcard()],
lists:foldl(fun(SetOfBindings, Acc) ->
sets:to_list(SetOfBindings) ++ Acc
- end, [], rabbit_store:list_in_khepri(Path))
+ end, [], rabbit_db:list_in_khepri(Path))
end).
-spec get_all_for_source(Src) -> [Binding] when
@@ -173,14 +173,14 @@ get_all_for_source(#resource{virtual_host = VHost, name = Name} = Resource) ->
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
Route = #route{binding = #binding{source = Resource, _ = '_'}},
- [B || #route{binding = B} <- rabbit_store:list_in_mnesia(rabbit_route, Route)]
+ [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Route)]
end,
fun() ->
Path = khepri_routes_path() ++ [VHost, Name,
- rabbit_store:if_has_data_wildcard()],
+ rabbit_db:if_has_data_wildcard()],
lists:foldl(fun(SetOfBindings, Acc) ->
sets:to_list(SetOfBindings) ++ Acc
- end, [], rabbit_store:list_in_khepri(Path))
+ end, [], rabbit_db:list_in_khepri(Path))
end).
-spec get_all_for_destination(Dst) -> [Binding] when
@@ -200,14 +200,14 @@ get_all_for_destination(#resource{virtual_host = VHost, name = Name,
Route = rabbit_binding:reverse_route(#route{binding = #binding{destination = Resource,
_ = '_'}}),
[rabbit_binding:reverse_binding(B) ||
- #reverse_route{reverse_binding = B} <- rabbit_store:list_in_mnesia(rabbit_reverse_route, Route)]
+ #reverse_route{reverse_binding = B} <- rabbit_db:list_in_mnesia(rabbit_reverse_route, Route)]
end,
fun() ->
Path = khepri_routes_path() ++ [VHost, ?KHEPRI_WILDCARD_STAR, Kind, Name,
- rabbit_store:if_has_data_wildcard()],
+ rabbit_db:if_has_data_wildcard()],
lists:foldl(fun(SetOfBindings, Acc) ->
sets:to_list(SetOfBindings) ++ Acc
- end, [], rabbit_store:list_in_khepri(Path))
+ end, [], rabbit_db:list_in_khepri(Path))
end).
-spec get_all(Src, Dst) -> [Binding] when
@@ -227,7 +227,7 @@ get_all(SrcName, DstName) ->
Route = #route{binding = #binding{source = SrcName,
destination = DstName,
_ = '_'}},
- [B || #route{binding = B} <- rabbit_store:list_in_mnesia(rabbit_route, Route)]
+ [B || #route{binding = B} <- rabbit_db:list_in_mnesia(rabbit_route, Route)]
end,
fun() ->
Values = get_all_in_khepri(SrcName, DstName),
@@ -257,7 +257,7 @@ get_all_explicit() ->
fun() ->
Condition = #if_not{condition = #if_name_matches{regex = "^$"}},
Path = khepri_routes_path() ++ [?KHEPRI_WILDCARD_STAR, Condition,
- rabbit_store:if_has_data_wildcard()],
+ rabbit_db:if_has_data_wildcard()],
{ok, Data} = rabbit_khepri:match(Path),
lists:foldl(fun(SetOfBindings, Acc) ->
sets:to_list(SetOfBindings) ++ Acc
@@ -285,7 +285,7 @@ fold(Fun, Acc) ->
end,
fun() ->
Path = khepri_routes_path() ++ [?KHEPRI_WILDCARD_STAR,
- rabbit_store:if_has_data_wildcard()],
+ rabbit_db:if_has_data_wildcard()],
{ok, Res} = rabbit_khepri:fold(
Path,
fun(_, #{data := SetOfBindings}, Acc0) ->
@@ -408,7 +408,7 @@ has_for_source_in_mnesia(SrcName) ->
contains(rabbit_semi_durable_route, Match).
has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
- Path = khepri_routes_path() ++ [VHost, Name, rabbit_store:if_has_data_wildcard()],
+ Path = khepri_routes_path() ++ [VHost, Name, rabbit_db:if_has_data_wildcard()],
case khepri_tx:get_many(Path) of
{ok, Map} ->
maps:size(Map) > 0;
@@ -418,7 +418,7 @@ has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
match_source_and_destination_in_khepri_tx(#resource{virtual_host = VHost, name = Name},
#resource{kind = Kind, name = DstName}) ->
- Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName, rabbit_store:if_has_data_wildcard()],
+ Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName, rabbit_db:if_has_data_wildcard()],
case khepri_tx:get_many(Path) of
{ok, Map} -> maps:values(Map);
_ -> []
@@ -511,7 +511,7 @@ lookup_resource_in_khepri_tx(#resource{kind = exchange} = Name) ->
rabbit_db_exchange:get_in_khepri_tx(Name).
match_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
- Path = khepri_routes_path() ++ [VHost, Name, rabbit_store:if_has_data_wildcard()],
+ Path = khepri_routes_path() ++ [VHost, Name, rabbit_db:if_has_data_wildcard()],
{ok, Map} = rabbit_khepri:match(Path),
Map.
@@ -521,7 +521,7 @@ match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name =
Map.
match_source_and_key_in_khepri(Src, ['_']) ->
- Path = khepri_routing_path(Src, rabbit_store:if_has_data_wildcard()),
+ Path = khepri_routing_path(Src, rabbit_db:if_has_data_wildcard()),
case rabbit_khepri:match(Path) of
{ok, Map} ->
maps:fold(fun(_, Dsts, Acc) ->
@@ -647,8 +647,8 @@ populate_index_route_table_in_mnesia() ->
get_all_in_khepri(#resource{virtual_host = VHost, name = Name},
#resource{kind = Kind, name = DstName}) ->
Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName,
- rabbit_store:if_has_data_wildcard()],
- rabbit_store:list_in_khepri(Path).
+ rabbit_db:if_has_data_wildcard()],
+ rabbit_db:list_in_khepri(Path).
delete_in_mnesia(Binding, ChecksFun) ->
binding_action_in_mnesia(
@@ -764,7 +764,7 @@ delete_for_source_in_mnesia(SrcName, ShouldIndexTable) ->
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
Path = khepri_routes_path() ++ [VHost, Name],
- {ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_store:if_has_data_wildcard()]),
+ {ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_db:if_has_data_wildcard()]),
ok = khepri_tx:delete(Path),
maps:fold(fun(_, Set, Acc) ->
sets:to_list(Set) ++ Acc
diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl
index ae150a3dd0..188be15e38 100644
--- a/deps/rabbit/src/rabbit_db_exchange.erl
+++ b/deps/rabbit/src/rabbit_db_exchange.erl
@@ -65,10 +65,10 @@
get_all() ->
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
- rabbit_store:list_in_mnesia(rabbit_exchange, #exchange{_ = '_'})
+ rabbit_db:list_in_mnesia(rabbit_exchange, #exchange{_ = '_'})
end,
fun() ->
- rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [rabbit_store:if_has_data_wildcard()])
+ rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [rabbit_db:if_has_data_wildcard()])
end).
-spec get_all(VHostName) -> [Exchange] when
@@ -100,10 +100,10 @@ get_all(VHost) ->
get_all_durable() ->
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
- rabbit_store:list_in_mnesia(rabbit_durable_exchange, #exchange{_ = '_'})
+ rabbit_db:list_in_mnesia(rabbit_durable_exchange, #exchange{_ = '_'})
end,
fun() ->
- rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [rabbit_store:if_has_data_wildcard()])
+ rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [rabbit_db:if_has_data_wildcard()])
end).
%% -------------------------------------------------------------------
@@ -124,7 +124,7 @@ list() ->
mnesia:dirty_all_keys(rabbit_exchange)
end,
fun() ->
- case rabbit_khepri:match(khepri_exchanges_path() ++ [rabbit_store:if_has_data_wildcard()]) of
+ case rabbit_khepri:match(khepri_exchanges_path() ++ [rabbit_db:if_has_data_wildcard()]) of
{ok, Map} ->
maps:fold(fun(_K, X, Acc) -> [X#exchange.name | Acc] end, [], Map);
_ ->
@@ -223,7 +223,7 @@ update(#resource{virtual_host = VHost, name = Name} = XName, Fun) ->
end,
fun() ->
Path = khepri_exchange_path(XName),
- rabbit_store:retry(
+ rabbit_db:retry(
fun () ->
case rabbit_khepri:adv_get(Path) of
{ok, #{data := X, payload_version := Vsn}} ->
@@ -346,7 +346,7 @@ next_serial(#exchange{name = #resource{name = Name, virtual_host = VHost} = XNam
fun() ->
%% Just storing the serial number is enough, no need to keep #exchange_serial{}
Path = khepri_exchange_serial_path(XName),
- rabbit_store:retry(
+ rabbit_db:retry(
fun() ->
case rabbit_khepri:adv_get(Path) of
{ok, #{data := Serial,
@@ -488,7 +488,7 @@ match(Pattern0) ->
fun() ->
%% TODO error handling?
Pattern = #if_data_matches{pattern = Pattern0},
- rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [?KHEPRI_WILDCARD_STAR, Pattern])
+ rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [?KHEPRI_WILDCARD_STAR, Pattern])
end).
%% -------------------------------------------------------------------
@@ -593,10 +593,10 @@ get_in_khepri_tx(Name) ->
list_exchanges_in_mnesia(VHost) ->
Match = #exchange{name = rabbit_misc:r(VHost, exchange), _ = '_'},
- rabbit_store:list_in_mnesia(rabbit_exchange, Match).
+ rabbit_db:list_in_mnesia(rabbit_exchange, Match).
list_exchanges_in_khepri(VHost) ->
- rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_store:if_has_data_wildcard()]).
+ rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_db:if_has_data_wildcard()]).
peek_serial_in_mnesia(XName, LockType) ->
case mnesia:read(rabbit_exchange_serial, XName, LockType) of
@@ -766,7 +766,7 @@ recover_in_mnesia(VHost) ->
recover_in_khepri(VHost) ->
%% Transient exchanges are deprecated in Khepri, all exchanges are recovered
- Exchanges0 = rabbit_store:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_store:if_has_data_wildcard()],
+ Exchanges0 = rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_db:if_has_data_wildcard()],
#{timeout => infinity}),
Exchanges = [rabbit_exchange_decorator:set(X) || X <- Exchanges0],
diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl
index 313e7b8094..60029c2cfe 100644
--- a/deps/rabbit/src/rabbit_db_msup.erl
+++ b/deps/rabbit/src/rabbit_db_msup.erl
@@ -173,7 +173,7 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} =
S = #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Overall,
childspec = ChildSpec},
- rabbit_store:retry(
+ rabbit_db:retry(
fun() ->
case rabbit_khepri:adv_get(Path) of
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl
index 5676198799..54b57194a9 100644
--- a/deps/rabbit/src/rabbit_db_queue.erl
+++ b/deps/rabbit/src/rabbit_db_queue.erl
@@ -53,7 +53,7 @@
%% HA queues are removed it can be deleted.
-export([internal_delete/3]).
-%% Used by other rabbit_db_* modules and rabbit_store
+%% Used by other rabbit_db_* modules
-export([
update_in_mnesia/2,
update_in_khepri/2,
@@ -82,12 +82,12 @@ get_all() ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> list_with_possible_retry_in_mnesia(
fun() ->
- rabbit_store:list_in_mnesia(rabbit_queue, amqqueue:pattern_match_all())
+ rabbit_db:list_in_mnesia(rabbit_queue, amqqueue:pattern_match_all())
end)
end,
fun() -> list_with_possible_retry_in_khepri(
fun() ->
- rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data_wildcard()])
+ rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data_wildcard()])
end)
end).
@@ -127,12 +127,12 @@ get_all_durable() ->
rabbit_khepri:try_mnesia_or_khepri(
fun() -> list_with_possible_retry_in_mnesia(
fun() ->
- rabbit_store:list_in_mnesia(rabbit_durable_queue, amqqueue:pattern_match_all())
+ rabbit_db:list_in_mnesia(rabbit_durable_queue, amqqueue:pattern_match_all())
end)
end,
fun() -> list_with_possible_retry_in_khepri(
fun() ->
- rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data_wildcard()])
+ rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data_wildcard()])
end)
end).
@@ -151,12 +151,12 @@ get_all_durable(VHost) ->
fun() -> list_with_possible_retry_in_mnesia(
fun() ->
Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHost, queue)),
- rabbit_store:list_in_mnesia(rabbit_durable_queue, Pattern)
+ rabbit_db:list_in_mnesia(rabbit_durable_queue, Pattern)
end)
end,
fun() -> list_with_possible_retry_in_khepri(
fun() ->
- rabbit_store:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_store:if_has_data_wildcard()])
+ rabbit_db:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_db:if_has_data_wildcard()])
end)
end).
@@ -164,10 +164,10 @@ get_all_durable_by_type(Type) ->
Pattern = amqqueue:pattern_match_on_type(Type),
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
- rabbit_store:list_in_mnesia(rabbit_durable_queue, Pattern)
+ rabbit_db:list_in_mnesia(rabbit_durable_queue, Pattern)
end,
fun() ->
- rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = amqqueue:pattern_match_all()}])])
+ rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = amqqueue:pattern_match_all()}])])
end).
list() ->
@@ -176,7 +176,7 @@ list() ->
mnesia:dirty_all_keys(rabbit_queue)
end,
fun() ->
- case rabbit_khepri:match(khepri_queues_path() ++ [rabbit_store:if_has_data_wildcard()]) of
+ case rabbit_khepri:match(khepri_queues_path() ++ [rabbit_db:if_has_data_wildcard()]) of
{ok, Map} ->
maps:fold(fun(_K, Q, Acc) -> [amqqueue:get_name(Q) | Acc] end, [], Map);
_ ->
@@ -271,7 +271,7 @@ update(#resource{virtual_host = VHost, name = Name} = QName, Fun) ->
end,
fun() ->
Path = khepri_queue_path(QName),
- rabbit_store:retry(
+ rabbit_db:retry(
fun() ->
case rabbit_khepri:adv_get(Path) of
{ok, #{data := Q, payload_version := Vsn}} ->
@@ -312,10 +312,10 @@ get_all_by_type(Type) ->
Pattern = amqqueue:pattern_match_on_type(Type),
rabbit_khepri:try_mnesia_or_khepri(
fun() ->
- rabbit_store:list_in_mnesia(rabbit_queue, Pattern)
+ rabbit_db:list_in_mnesia(rabbit_queue, Pattern)
end,
fun() ->
- rabbit_store:list_in_khepri(khepri_queues_path() ++ [rabbit_store:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}])])
+ rabbit_db:list_in_khepri(khepri_queues_path() ++ [rabbit_db:if_has_data([?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}])])
end).
create_or_get(DurableQ, Q) ->
@@ -506,13 +506,13 @@ get_all_in_mnesia(VHost) ->
list_with_possible_retry_in_mnesia(
fun() ->
Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHost, queue)),
- rabbit_store:list_in_mnesia(rabbit_queue, Pattern)
+ rabbit_db:list_in_mnesia(rabbit_queue, Pattern)
end).
get_all_in_khepri(VHost) ->
list_with_possible_retry_in_khepri(
fun() ->
- rabbit_store:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_store:if_has_data_wildcard()])
+ rabbit_db:list_in_khepri(khepri_queues_path() ++ [VHost, rabbit_db:if_has_data_wildcard()])
end).
not_found_or_absent_queue_dirty_in_mnesia(Name) ->
@@ -657,7 +657,7 @@ update_decorators_in_khepri(#resource{virtual_host = VHost, name = Name} = QName
%% Decorators have just been calculated on `rabbit_queue_decorator:maybe_recover/1`, thus
%% we can update them here directly.
Path = khepri_queue_path(QName),
- rabbit_store:retry(
+ rabbit_db:retry(
fun() ->
case rabbit_khepri:adv_get(Path) of
{ok, #{data := Q0, payload_version := Vsn}} ->
diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl
index 88eff021bf..9a27057009 100644
--- a/deps/rabbit/src/rabbit_db_topic_exchange.erl
+++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl
@@ -176,7 +176,7 @@ add_topic_trie_binding_in_khepri(XName, RoutingKey, Destination, Args) ->
Path = khepri_exchange_type_topic_path(XName) ++ split_topic_key_binary(RoutingKey),
{Path0, [Last]} = lists:split(length(Path) - 1, Path),
Binding = #{destination => Destination, arguments => Args},
- rabbit_store:retry(
+ rabbit_db:retry(
fun() ->
case rabbit_khepri:adv_get(Path) of
{ok, #{data := Set0, payload_version := Vsn}} ->
@@ -202,7 +202,7 @@ add_topic_trie_binding_tx(XName, RoutingKey, Destination, Args) ->
ok = khepri_tx:put(Path, Set).
route_delivery_for_exchange_type_topic_in_khepri(XName, RoutingKey) ->
- Root = khepri_exchange_type_topic_path(XName) ++ [rabbit_store:if_has_data_wildcard()],
+ Root = khepri_exchange_type_topic_path(XName) ++ [rabbit_db:if_has_data_wildcard()],
case rabbit_khepri:fold(
Root,
fun(Path0, #{data := Set}, Acc) ->
diff --git a/deps/rabbit/src/rabbit_mirror_queue_misc.erl b/deps/rabbit/src/rabbit_mirror_queue_misc.erl
index 8b9840bc47..b502ef6870 100644
--- a/deps/rabbit/src/rabbit_mirror_queue_misc.erl
+++ b/deps/rabbit/src/rabbit_mirror_queue_misc.erl
@@ -164,7 +164,7 @@ remove_from_queue_in_khepri(QueueName, Self, DeadGMPids) ->
fun () ->
%% Someone else could have deleted the queue before we
%% get here. Or, gm group could've altered. see rabbitmq-server#914
- case rabbit_store:lookup_queue_in_khepri_tx(QueueName) of
+ case rabbit_db:get_in_khepri_tx(QueueName) of
[] -> {error, not_found};
[Q0] when ?is_amqqueue(Q0) ->
QPid = amqqueue:get_pid(Q0),