Skip to content

Commit 98c2cd0

Browse files
authored
Merge pull request #11795 from rabbitmq/unify-khepri-paths-api
Unify Khepri paths API
2 parents 21b9515 + 1383c0c commit 98c2cd0

26 files changed

+513
-442
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@
3232
delete_transient_for_destination_in_mnesia/1,
3333
has_for_source_in_mnesia/1,
3434
has_for_source_in_khepri/1,
35-
match_source_and_destination_in_khepri_tx/2
35+
match_source_and_destination_in_khepri_tx/2,
36+
clear_in_khepri/0
3637
]).
3738

3839
-export([
39-
khepri_route_path/1,
40-
khepri_routes_path/0,
40+
khepri_route_path/1, khepri_route_path/5,
41+
khepri_route_path_to_args/1,
4142
khepri_route_exchange_path/1
4243
]).
4344

@@ -610,9 +611,12 @@ fold_in_mnesia(Fun, Acc) ->
610611
end, Acc, ?MNESIA_TABLE).
611612

612613
fold_in_khepri(Fun, Acc) ->
613-
Path = khepri_routes_path() ++ [_VHost = ?KHEPRI_WILDCARD_STAR,
614-
_SrcName = ?KHEPRI_WILDCARD_STAR,
615-
rabbit_khepri:if_has_data_wildcard()],
614+
Path = khepri_route_path(
615+
_VHost = ?KHEPRI_WILDCARD_STAR,
616+
_SrcName = ?KHEPRI_WILDCARD_STAR,
617+
_Kind = ?KHEPRI_WILDCARD_STAR,
618+
_DstName = ?KHEPRI_WILDCARD_STAR,
619+
_RoutingKey = #if_has_data{}),
616620
{ok, Res} = rabbit_khepri:fold(
617621
Path,
618622
fun(_, #{data := SetOfBindings}, Acc0) ->
@@ -828,9 +832,14 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
828832
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
829833

830834
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
831-
Path = khepri_routes_path() ++ [VHost, Name],
832-
{ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_khepri:if_has_data_wildcard()]),
833-
ok = khepri_tx:delete(Path),
835+
Path = khepri_route_path(
836+
VHost,
837+
Name,
838+
_Kind = ?KHEPRI_WILDCARD_STAR,
839+
_DstName = ?KHEPRI_WILDCARD_STAR,
840+
_RoutingKey = #if_has_data{}),
841+
{ok, Bindings} = khepri_tx:get_many(Path),
842+
ok = khepri_tx:delete_many(Path),
834843
maps:fold(fun(_P, Set, Acc) ->
835844
sets:to_list(Set) ++ Acc
836845
end, [], Bindings).
@@ -885,7 +894,12 @@ delete_for_destination_in_khepri(DstName, OnlyDurable) ->
885894
lists:keysort(#binding.source, Bindings), OnlyDurable).
886895

887896
match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) ->
888-
Path = khepri_routes_path() ++ [VHost, ?KHEPRI_WILDCARD_STAR, Kind, Name, ?KHEPRI_WILDCARD_STAR_STAR],
897+
Path = khepri_route_path(
898+
VHost,
899+
_SrcName = ?KHEPRI_WILDCARD_STAR,
900+
Kind,
901+
Name,
902+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
889903
{ok, Map} = khepri_tx:get_many(Path),
890904
Map.
891905

@@ -926,7 +940,12 @@ has_for_source_in_mnesia(SrcName) ->
926940
-spec has_for_source_in_khepri(rabbit_types:binding_source()) -> boolean().
927941

928942
has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
929-
Path = khepri_routes_path() ++ [VHost, Name, rabbit_khepri:if_has_data_wildcard()],
943+
Path = khepri_route_path(
944+
VHost,
945+
Name,
946+
_Kind = ?KHEPRI_WILDCARD_STAR,
947+
_DstName = ?KHEPRI_WILDCARD_STAR,
948+
_RoutingKey = #if_has_data{}),
930949
case khepri_tx:get_many(Path) of
931950
{ok, Map} ->
932951
maps:size(Map) > 0;
@@ -945,7 +964,8 @@ has_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
945964

946965
match_source_and_destination_in_khepri_tx(#resource{virtual_host = VHost, name = Name},
947966
#resource{kind = Kind, name = DstName}) ->
948-
Path = khepri_routes_path() ++ [VHost, Name, Kind, DstName, rabbit_khepri:if_has_data_wildcard()],
967+
Path = khepri_route_path(
968+
VHost, Name, Kind, DstName, _RoutingKey = #if_has_data{}),
949969
case khepri_tx:get_many(Path) of
950970
{ok, Map} -> maps:values(Map);
951971
_ -> []
@@ -974,7 +994,12 @@ clear_in_mnesia() ->
974994
ok.
975995

976996
clear_in_khepri() ->
977-
Path = khepri_routes_path(),
997+
Path = khepri_route_path(
998+
_VHost = ?KHEPRI_WILDCARD_STAR,
999+
_SrcName = ?KHEPRI_WILDCARD_STAR,
1000+
_Kind = ?KHEPRI_WILDCARD_STAR,
1001+
_DstName = ?KHEPRI_WILDCARD_STAR,
1002+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
9781003
case rabbit_khepri:delete(Path) of
9791004
ok -> ok;
9801005
Error -> throw(Error)
@@ -983,13 +1008,44 @@ clear_in_khepri() ->
9831008
%% --------------------------------------------------------------
9841009
%% Paths
9851010
%% --------------------------------------------------------------
986-
khepri_route_path(#binding{source = #resource{virtual_host = VHost, name = SrcName},
987-
destination = #resource{kind = Kind, name = DstName},
988-
key = RoutingKey}) ->
1011+
1012+
khepri_route_path(
1013+
#binding{source = #resource{virtual_host = VHost, name = SrcName},
1014+
destination = #resource{kind = Kind, name = DstName},
1015+
key = RoutingKey}) ->
1016+
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey).
1017+
1018+
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey)
1019+
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
1020+
?IS_KHEPRI_PATH_CONDITION(SrcName) andalso
1021+
?IS_KHEPRI_PATH_CONDITION(Kind) andalso
1022+
?IS_KHEPRI_PATH_CONDITION(DstName) andalso
1023+
?IS_KHEPRI_PATH_CONDITION(RoutingKey) ->
9891024
[?MODULE, routes, VHost, SrcName, Kind, DstName, RoutingKey].
9901025

991-
khepri_routes_path() ->
992-
[?MODULE, routes].
1026+
khepri_route_path_to_args(Path) ->
1027+
Pattern = khepri_route_path(
1028+
'$VHost', '$SrcName', '$Kind', '$DstName', '$RoutingKey'),
1029+
khepri_route_path_to_args(Pattern, Path, #{}).
1030+
1031+
khepri_route_path_to_args([Var | Pattern], [Value | Path], Result)
1032+
when Var =:= '$VHost' orelse
1033+
Var =:= '$SrcName' orelse
1034+
Var =:= '$Kind' orelse
1035+
Var =:= '$DstName' orelse
1036+
Var =:= '$RoutingKey' ->
1037+
Result1 = Result#{Var => Value},
1038+
khepri_route_path_to_args(Pattern, Path, Result1);
1039+
khepri_route_path_to_args([Comp | Pattern], [Comp | Path], Result) ->
1040+
khepri_route_path_to_args(Pattern, Path, Result);
1041+
khepri_route_path_to_args(
1042+
[], _,
1043+
#{'$VHost' := VHost,
1044+
'$SrcName' := SrcName,
1045+
'$Kind' := Kind,
1046+
'$DstName' := DstName,
1047+
'$RoutingKey' := RoutingKey}) ->
1048+
{VHost, SrcName, Kind, DstName, RoutingKey}.
9931049

9941050
khepri_route_exchange_path(#resource{virtual_host = VHost, name = SrcName}) ->
9951051
[?MODULE, routes, VHost, SrcName].

deps/rabbit/src/rabbit_db_binding_m2k_converter.erl

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-behaviour(mnesia_to_khepri_converter).
1111

1212
-include_lib("kernel/include/logger.hrl").
13+
-include_lib("khepri/include/khepri.hrl").
1314
-include_lib("khepri_mnesia_migration/src/kmm_logging.hrl").
1415
-include_lib("rabbit_common/include/rabbit.hrl").
1516

@@ -111,8 +112,4 @@ delete_from_khepri(rabbit_route = Table, Key, State) ->
111112
Table :: atom().
112113

113114
clear_data_in_khepri(rabbit_route) ->
114-
Path = rabbit_db_binding:khepri_routes_path(),
115-
case rabbit_khepri:delete(Path) of
116-
ok -> ok;
117-
Error -> throw(Error)
118-
end.
115+
rabbit_db_binding:clear_in_khepri().

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,16 @@
4141
get_in_khepri_tx/1,
4242
update_in_mnesia_tx/2,
4343
update_in_khepri_tx/2,
44-
path/1
44+
clear_exchanges_in_khepri/0,
45+
clear_exchange_serials_in_khepri/0
4546
]).
4647

4748
%% For testing
4849
-export([clear/0]).
4950

5051
-export([
51-
khepri_exchange_path/1,
52-
khepri_exchange_serial_path/1,
53-
khepri_exchanges_path/0,
54-
khepri_exchange_serials_path/0
52+
khepri_exchange_path/1, khepri_exchange_path/2,
53+
khepri_exchange_serial_path/1, khepri_exchange_serial_path/2
5554
]).
5655

5756
-define(MNESIA_TABLE, rabbit_exchange).
@@ -81,7 +80,8 @@ get_all_in_mnesia() ->
8180
rabbit_db:list_in_mnesia(?MNESIA_TABLE, #exchange{_ = '_'}).
8281

8382
get_all_in_khepri() ->
84-
rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [rabbit_khepri:if_has_data_wildcard()]).
83+
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, #if_has_data{}),
84+
rabbit_db:list_in_khepri(Path).
8585

8686
-spec get_all(VHostName) -> [Exchange] when
8787
VHostName :: vhost:name(),
@@ -103,7 +103,8 @@ get_all_in_mnesia(VHost) ->
103103
rabbit_db:list_in_mnesia(?MNESIA_TABLE, Match).
104104

105105
get_all_in_khepri(VHost) ->
106-
rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_khepri:if_has_data_wildcard()]).
106+
Path = khepri_exchange_path(VHost, #if_has_data{}),
107+
rabbit_db:list_in_khepri(Path).
107108

108109
%% -------------------------------------------------------------------
109110
%% get_all_durable().
@@ -127,7 +128,7 @@ get_all_durable_in_mnesia() ->
127128
rabbit_db:list_in_mnesia(rabbit_durable_exchange, #exchange{_ = '_'}).
128129

129130
get_all_durable_in_khepri() ->
130-
rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [rabbit_khepri:if_has_data_wildcard()]).
131+
get_all_in_khepri().
131132

132133
%% -------------------------------------------------------------------
133134
%% list().
@@ -202,7 +203,8 @@ get_in_khepri(Name) ->
202203
Ret :: [Exchange :: rabbit_types:exchange()].
203204

204205
get_in_khepri_tx(Name) ->
205-
case khepri_tx:get(khepri_exchange_path(Name)) of
206+
Path = khepri_exchange_path(Name),
207+
case khepri_tx:get(Path) of
206208
{ok, X} -> [X];
207209
_ -> []
208210
end.
@@ -261,7 +263,11 @@ count_in_mnesia() ->
261263
mnesia:table_info(?MNESIA_TABLE, size).
262264

263265
count_in_khepri() ->
264-
rabbit_khepri:count_children(khepri_exchanges_path() ++ [?KHEPRI_WILDCARD_STAR]).
266+
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
267+
case rabbit_khepri:count(Path) of
268+
{ok, Count} -> Count;
269+
_ -> 0
270+
end.
265271

266272
%% -------------------------------------------------------------------
267273
%% update().
@@ -719,8 +725,8 @@ recover_in_khepri(VHost) ->
719725
%% cannot be skipped and stopping the node is not an option -
720726
%% the next boot most likely would behave the same way.
721727
%% Any other request stays with the default timeout, currently 30s.
722-
Exchanges0 = rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [VHost, rabbit_khepri:if_has_data_wildcard()],
723-
#{timeout => infinity}),
728+
Path = khepri_exchange_path(VHost, #if_has_data{}),
729+
Exchanges0 = rabbit_db:list_in_khepri(Path, #{timeout => infinity}),
724730
Exchanges = [rabbit_exchange_decorator:set(X) || X <- Exchanges0],
725731

726732
rabbit_khepri:transaction(
@@ -765,7 +771,8 @@ match_in_mnesia(Pattern) ->
765771

766772
match_in_khepri(Pattern0) ->
767773
Pattern = #if_data_matches{pattern = Pattern0},
768-
rabbit_db:list_in_khepri(khepri_exchanges_path() ++ [?KHEPRI_WILDCARD_STAR, Pattern]).
774+
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, Pattern),
775+
rabbit_db:list_in_khepri(Path).
769776

770777
%% -------------------------------------------------------------------
771778
%% exists().
@@ -814,8 +821,17 @@ clear_in_mnesia() ->
814821
ok.
815822

816823
clear_in_khepri() ->
817-
khepri_delete(khepri_exchanges_path()),
818-
khepri_delete(khepri_exchange_serials_path()).
824+
clear_exchanges_in_khepri(),
825+
clear_exchange_serials_in_khepri().
826+
827+
clear_exchanges_in_khepri() ->
828+
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
829+
khepri_delete(Path).
830+
831+
clear_exchange_serials_in_khepri() ->
832+
Path = khepri_exchange_serial_path(
833+
?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
834+
khepri_delete(Path).
819835

820836
khepri_delete(Path) ->
821837
case rabbit_khepri:delete(Path) of
@@ -875,25 +891,18 @@ maybe_auto_delete_in_khepri(XName, OnlyDurable) ->
875891
%% Khepri paths
876892
%% -------------------------------------------------------------------
877893

878-
khepri_exchanges_path() ->
879-
[?MODULE, exchanges].
880-
881894
khepri_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
882-
[?MODULE, exchanges, VHost, Name].
895+
khepri_exchange_path(VHost, Name).
883896

884-
khepri_exchange_serials_path() ->
885-
[?MODULE, exchange_serials].
897+
khepri_exchange_path(VHost, Name)
898+
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
899+
?IS_KHEPRI_PATH_CONDITION(Name) ->
900+
[?MODULE, exchanges, VHost, Name].
886901

887902
khepri_exchange_serial_path(#resource{virtual_host = VHost, name = Name}) ->
888-
[?MODULE, exchange_serials, VHost, Name].
889-
890-
%% -------------------------------------------------------------------
891-
%% path().
892-
%% -------------------------------------------------------------------
903+
khepri_exchange_serial_path(VHost, Name).
893904

894-
-spec path(ExchangeName) -> Path when
895-
ExchangeName :: rabbit_exchange:name(),
896-
Path :: khepri_path:path().
897-
898-
path(Name) ->
899-
khepri_exchange_path(Name).
905+
khepri_exchange_serial_path(VHost, Name)
906+
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
907+
?IS_KHEPRI_PATH_CONDITION(Name) ->
908+
[?MODULE, exchange_serials, VHost, Name].

deps/rabbit/src/rabbit_db_exchange_m2k_converter.erl

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,6 @@ delete_from_khepri(rabbit_exchange_serial = Table, Key, State) ->
129129
Table :: atom().
130130

131131
clear_data_in_khepri(rabbit_exchange) ->
132-
khepri_delete(rabbit_db_exchange:khepri_exchanges_path());
132+
rabbit_db_exchange:clear_exchanges_in_khepri();
133133
clear_data_in_khepri(rabbit_exchange_serial) ->
134-
khepri_delete(rabbit_db_exchange:khepri_exchange_serials_path()).
135-
136-
khepri_delete(Path) ->
137-
case rabbit_khepri:delete(Path) of
138-
ok -> ok;
139-
Error -> throw(Error)
140-
end.
134+
rabbit_db_exchange:clear_exchange_serials_in_khepri().

deps/rabbit/src/rabbit_db_maintenance.erl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
-module(rabbit_db_maintenance).
99

10+
-include_lib("khepri/include/khepri.hrl").
1011
-include_lib("rabbit_common/include/rabbit.hrl").
1112

1213
-export([
@@ -17,8 +18,7 @@
1718
]).
1819

1920
-export([
20-
khepri_maintenance_path/1,
21-
khepri_maintenance_path/0
21+
khepri_maintenance_path/1
2222
]).
2323

2424
-define(TABLE, rabbit_node_maintenance_states).
@@ -167,8 +167,5 @@ get_consistent_in_khepri(Node) ->
167167
%% Khepri paths
168168
%% -------------------------------------------------------------------
169169

170-
khepri_maintenance_path() ->
171-
[?MODULE, maintenance].
172-
173-
khepri_maintenance_path(Node) ->
170+
khepri_maintenance_path(Node) when ?IS_KHEPRI_PATH_CONDITION(Node) ->
174171
[?MODULE, maintenance, Node].

deps/rabbit/src/rabbit_db_maintenance_m2k_converter.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-behaviour(mnesia_to_khepri_converter).
1111

1212
-include_lib("kernel/include/logger.hrl").
13+
-include_lib("khepri/include/khepri.hrl").
1314
-include_lib("khepri_mnesia_migration/src/kmm_logging.hrl").
1415
-include_lib("rabbit_common/include/rabbit.hrl").
1516

@@ -95,7 +96,7 @@ delete_from_khepri(rabbit_node_maintenance_states = Table, Key, State) ->
9596
Table :: atom().
9697

9798
clear_data_in_khepri(rabbit_node_maintenance_states) ->
98-
Path = rabbit_db_maintenance:khepri_maintenance_path(),
99+
Path = rabbit_db_maintenance:khepri_maintenance_path(?KHEPRI_WILDCARD_STAR),
99100
case rabbit_khepri:delete(Path) of
100101
ok -> ok;
101102
Error -> throw(Error)

0 commit comments

Comments
 (0)