2
2
3
3
-include (" mongoose.hrl" ).
4
4
5
- -export ([
6
- start /0 ,
7
- get_cached_cluster_id /0 ,
8
- get_backend_cluster_id /0
9
- ]).
5
+ -export ([start /0 , get_cached_cluster_id /0 ]).
10
6
11
7
% For testing purposes only
12
- -export ([clean_table /0 , clean_cache /0 ]).
8
+ -export ([clean_table /0 , clean_cache /0 , get_backend_cluster_id / 0 ]).
13
9
14
10
-ignore_xref ([clean_table / 0 , clean_cache / 0 , get_backend_cluster_id / 0 ]).
15
11
16
12
-record (mongoose_cluster_id , {key :: atom (), value :: cluster_id ()}).
17
13
-type cluster_id () :: binary ().
18
14
-type maybe_cluster_id () :: {ok , cluster_id ()} | {error , any ()}.
19
- -type mongoose_backend () :: rdbms
20
- | mnesia
21
- | cets .
15
+ -type persistent_backend () :: rdbms | {error , none }.
16
+ -type volatile_backend () :: mnesia | cets .
22
17
23
18
-spec start () -> maybe_cluster_id ().
24
19
start () ->
25
20
% % Consider rewriting this logic, so it does not block the starting process.
26
21
% % Currently, we have to do an SQL query each time we restart MongooseIM
27
22
% % application in the tests.
28
23
init_cache (),
29
- Backend = which_backend_available (),
30
- IntBackend = which_volatile_backend_available (),
31
- maybe_prepare_queries (Backend ),
24
+ PersistentBackend = which_persistent_backend_enabled (),
25
+ VolatileBackend = which_volatile_backend_available (),
26
+ maybe_prepare_queries (PersistentBackend ),
32
27
cets_long :run_tracked (#{task => wait_for_any_backend ,
33
- backend => Backend , volatile_backend => IntBackend },
34
- fun () -> wait_for_any_backend (Backend , IntBackend ) end ),
35
- CachedRes = get_cached_cluster_id (IntBackend ),
36
- BackendRes = get_backend_cluster_id (),
28
+ backend => PersistentBackend , volatile_backend => VolatileBackend },
29
+ fun () -> wait_for_any_backend (PersistentBackend , VolatileBackend ) end ),
30
+ CachedRes = get_cached_cluster_id (VolatileBackend ),
31
+ BackendRes = get_backend_cluster_id (PersistentBackend ),
37
32
case {CachedRes , BackendRes } of
38
33
{{ok , ID }, {ok , ID }} ->
39
34
{ok , ID };
40
35
{{ok , ID }, {error , _ }} ->
41
- set_new_cluster_id (ID , Backend );
36
+ persist_cluster_id (ID , PersistentBackend );
42
37
{{error , _ }, {ok , ID }} ->
43
- set_new_cluster_id (ID , IntBackend );
38
+ cache_cluster_id (ID , VolatileBackend );
44
39
{{error , _ }, {error , _ }} ->
45
- make_and_set_new_cluster_id ();
40
+ make_and_set_new_cluster_id (PersistentBackend , VolatileBackend );
46
41
{{ok , CachedID }, {ok , BackendID }} ->
47
42
? LOG_ERROR (#{what => cluster_id_setup_conflict ,
48
43
text => <<" Mnesia and Backend have different cluster IDs" >>,
@@ -53,12 +48,14 @@ start() ->
53
48
54
49
% % If RDBMS is available before CETS - it is enough for us to continue
55
50
% % the starting procedure
56
- wait_for_any_backend (Backend , IntBackend ) ->
51
+ wait_for_any_backend (PersistentBackend , VolatileBackend ) ->
57
52
Alias = erlang :alias ([reply ]),
58
- Pids = lists :append ([wait_for_backend_promise (B , Alias ) || B <- lists :sort ([Backend , IntBackend ])]),
53
+ Pids = lists :append ([wait_for_backend_promise (B , Alias )
54
+ || B <- lists :sort ([PersistentBackend , VolatileBackend ])]),
59
55
wait_for_first_reply (Alias ),
60
56
% % Interrupt other waiting calls to reduce the logging noise
61
57
[erlang :exit (Pid , shutdown ) || Pid <- Pids ],
58
+ clear_pending_replies (Alias ),
62
59
ok .
63
60
64
61
wait_for_first_reply (Alias ) ->
@@ -67,9 +64,13 @@ wait_for_first_reply(Alias) ->
67
64
ok
68
65
end .
69
66
70
- wait_for_backend_promise (mnesia , Alias ) ->
71
- Alias ! {ready , Alias },
72
- [];
67
+ clear_pending_replies (Alias ) ->
68
+ receive
69
+ {ready , Alias } -> clear_pending_replies (Alias )
70
+ after
71
+ 0 -> ok
72
+ end .
73
+
73
74
wait_for_backend_promise (cets , Alias ) ->
74
75
[spawn (fun () ->
75
76
% % We have to do it, because we want to read from across the cluster
@@ -81,7 +82,10 @@ wait_for_backend_promise(rdbms, Alias) ->
81
82
[spawn (fun () ->
82
83
cets_long :run_tracked (#{task => wait_for_rdbms }, fun () -> wait_for_rdbms () end ),
83
84
Alias ! {ready , Alias }
84
- end )].
85
+ end )];
86
+ wait_for_backend_promise (_ , Alias ) ->
87
+ Alias ! {ready , Alias },
88
+ [].
85
89
86
90
wait_for_rdbms () ->
87
91
case get_backend_cluster_id (rdbms ) of
@@ -121,16 +125,14 @@ get_cached_cluster_id(cets) ->
121
125
% % ====================================================================
122
126
-spec get_backend_cluster_id () -> maybe_cluster_id ().
123
127
get_backend_cluster_id () ->
124
- get_backend_cluster_id (which_backend_available ()).
125
-
126
- -spec set_new_cluster_id (cluster_id ()) -> maybe_cluster_id ().
127
- set_new_cluster_id (ID ) ->
128
- set_new_cluster_id (ID , which_backend_available ()).
128
+ get_backend_cluster_id (which_persistent_backend_enabled ()).
129
129
130
- -spec make_and_set_new_cluster_id () -> maybe_cluster_id ().
131
- make_and_set_new_cluster_id () ->
132
- NewID = make_cluster_id (),
133
- set_new_cluster_id (NewID ).
130
+ -spec make_and_set_new_cluster_id (persistent_backend (), volatile_backend ()) ->
131
+ maybe_cluster_id ().
132
+ make_and_set_new_cluster_id (PersistentBackend , VolatileBackend ) ->
133
+ NewID = make_cluster_id (PersistentBackend ),
134
+ persist_cluster_id (NewID , PersistentBackend ),
135
+ cache_cluster_id (NewID , VolatileBackend ).
134
136
135
137
% % ====================================================================
136
138
% % Internal functions
@@ -149,31 +151,36 @@ init_cache(cets) ->
149
151
cets :start (cets_cluster_id , #{}),
150
152
cets_discovery :add_table (mongoose_cets_discovery , cets_cluster_id ).
151
153
152
- -spec maybe_prepare_queries (mongoose_backend ()) -> ok .
153
- maybe_prepare_queries (mnesia ) -> ok ;
154
+ -spec maybe_prepare_queries (persistent_backend ()) -> any ().
154
155
maybe_prepare_queries (rdbms ) ->
155
156
mongoose_rdbms :prepare (cluster_insert_new , mongoose_cluster_id , [v ],
156
157
<<" INSERT INTO mongoose_cluster_id(k,v) VALUES ('cluster_id', ?)" >>),
157
158
mongoose_rdbms :prepare (cluster_select , mongoose_cluster_id , [],
158
- <<" SELECT v FROM mongoose_cluster_id WHERE k='cluster_id'" >>),
159
+ <<" SELECT v FROM mongoose_cluster_id WHERE k='cluster_id'" >>);
160
+ maybe_prepare_queries (_ ) ->
159
161
ok .
160
162
161
163
-spec execute_cluster_insert_new (binary ()) -> mongoose_rdbms :query_result ().
162
164
execute_cluster_insert_new (ID ) ->
163
165
mongoose_rdbms :execute_successfully (global , cluster_insert_new , [ID ]).
164
166
165
- -spec make_cluster_id () -> cluster_id ().
166
- make_cluster_id () ->
167
- uuid :uuid_to_string (uuid :get_v4 (), binary_standard ).
167
+ % % If there's no persistent backend, cluster IDs will be recreated on every cluster restart,
168
+ % % hence prefix them as ephemeral to re-classify them later.
169
+ -spec make_cluster_id (persistent_backend ()) -> cluster_id ().
170
+ make_cluster_id (rdbms ) ->
171
+ uuid :uuid_to_string (uuid :get_v4 (), binary_standard );
172
+ make_cluster_id ({error , none }) ->
173
+ <<" ephemeral-" , (uuid :uuid_to_string (uuid :get_v4 (), binary_standard ))/binary >>.
168
174
169
- % % Which backend is enabled
170
- -spec which_backend_available () -> mongoose_backend ().
171
- which_backend_available () ->
175
+ % % Which persistent backend is enabled
176
+ -spec which_persistent_backend_enabled () -> persistent_backend ().
177
+ which_persistent_backend_enabled () ->
172
178
case mongoose_wpool :get_pool_settings (rdbms , global , default ) of
173
- undefined -> which_volatile_backend_available () ;
179
+ undefined -> { error , none } ;
174
180
_ -> rdbms
175
181
end .
176
182
183
+ -spec which_volatile_backend_available () -> volatile_backend ().
177
184
which_volatile_backend_available () ->
178
185
case mongoose_config :get_opt (internal_databases ) of
179
186
#{cets := _ } ->
@@ -182,11 +189,10 @@ which_volatile_backend_available() ->
182
189
mnesia
183
190
end .
184
191
185
- -spec set_new_cluster_id (cluster_id (), mongoose_backend ()) -> ok | { error , any ()} .
186
- set_new_cluster_id (ID , rdbms ) ->
192
+ -spec persist_cluster_id (cluster_id (), persistent_backend ()) -> maybe_cluster_id () .
193
+ persist_cluster_id (ID , rdbms ) ->
187
194
try execute_cluster_insert_new (ID ) of
188
195
{updated , 1 } ->
189
- set_new_cluster_id (ID , which_volatile_backend_available ()),
190
196
{ok , ID }
191
197
catch
192
198
Class :Reason :Stacktrace ->
@@ -196,20 +202,24 @@ set_new_cluster_id(ID, rdbms) ->
196
202
class => Class , reason => Reason , stacktrace => Stacktrace }),
197
203
{error , {Class , Reason }}
198
204
end ;
199
- set_new_cluster_id (ID , mnesia ) ->
205
+ persist_cluster_id (ID , {error , none }) ->
206
+ {ok , ID }.
207
+
208
+ -spec cache_cluster_id (cluster_id (), volatile_backend ()) -> maybe_cluster_id ().
209
+ cache_cluster_id (ID , mnesia ) ->
200
210
T = fun () -> mnesia :write (# mongoose_cluster_id {key = cluster_id , value = ID }) end ,
201
211
case mnesia :transaction (T ) of
202
212
{atomic , ok } ->
203
213
{ok , ID };
204
214
{aborted , Reason } ->
205
215
{error , Reason }
206
216
end ;
207
- set_new_cluster_id (ID , cets ) ->
217
+ cache_cluster_id (ID , cets ) ->
208
218
cets :insert_serial (cets_cluster_id , {cluster_id , ID }),
209
219
{ok , ID }.
210
220
211
221
% % Get cluster ID
212
- -spec get_backend_cluster_id (mongoose_backend ()) -> maybe_cluster_id ().
222
+ -spec get_backend_cluster_id (persistent_backend ()) -> maybe_cluster_id ().
213
223
get_backend_cluster_id (rdbms ) ->
214
224
try mongoose_rdbms :execute_successfully (global , cluster_select , []) of
215
225
{selected , [{ID }]} -> {ok , ID };
@@ -221,15 +231,13 @@ get_backend_cluster_id(rdbms) ->
221
231
class => Class , reason => Reason , stacktrace => Stacktrace }),
222
232
{error , {Class , Reason }}
223
233
end ;
224
- get_backend_cluster_id (mnesia ) ->
225
- get_cached_cluster_id (mnesia );
226
- get_backend_cluster_id (cets ) ->
227
- get_cached_cluster_id (cets ).
234
+ get_backend_cluster_id ({error , none }) ->
235
+ {error , no_value_in_backend }.
228
236
229
237
clean_table () ->
230
- clean_table (which_backend_available ()).
238
+ clean_table (which_persistent_backend_enabled ()).
231
239
232
- -spec clean_table (mongoose_backend ()) -> ok | {error , any ()}.
240
+ -spec clean_table (persistent_backend ()) -> ok | {error , any ()}.
233
241
clean_table (rdbms ) ->
234
242
SQLQuery = [<<" TRUNCATE TABLE mongoose_cluster_id;" >>],
235
243
try mongoose_rdbms :sql_query (global , SQLQuery ) of
0 commit comments