Skip to content

Commit a9697a3

Browse files
Merge pull request #4402 from esl/fix-cockroachdb-consistency
Fix cockroachdb consistency
2 parents 1d43a17 + ccfa70b commit a9697a3

File tree

4 files changed

+118
-38
lines changed

4 files changed

+118
-38
lines changed

big_tests/test.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@
286286
workers = 10
287287
[outgoing_pools.rdbms.default]
288288
scope = \"global\"
289-
workers = 1
289+
workers = 5
290290
connection.driver = \"cockroachdb\"
291291
connection.host = \"localhost\"
292292
connection.port = 26257

big_tests/tests/pubsub_SUITE.erl

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
require_rpc_nodes/1,
2323
subhost_pattern/1,
2424
rpc/4]).
25+
-import(domain_helper, [host_type/0]).
2526

2627
%%--------------------------------------------------------------------
2728
%% Suite configuration
@@ -49,17 +50,26 @@ group_is_compatible(hometree_specific, OnlyNodetreeTree) -> OnlyNodetreeTree =:=
4950
group_is_compatible(_, _) -> true.
5051

5152
base_groups() ->
52-
[{basic, [parallel], basic_tests()},
53-
{service_config, [parallel], service_config_tests()},
54-
{node_config, [parallel], node_config_tests()},
55-
{node_affiliations, [parallel], node_affiliations_tests()},
56-
{manage_subscriptions, [parallel], manage_subscriptions_tests()},
53+
[{basic, parallel_props(), basic_tests()},
54+
{service_config, parallel_props(), service_config_tests()},
55+
{node_config, parallel_props(), node_config_tests()},
56+
{node_affiliations, parallel_props(), node_affiliations_tests()},
57+
{manage_subscriptions, parallel_props(), manage_subscriptions_tests()},
5758
{collection, [sequence], collection_tests()},
58-
{collection_config, [parallel], collection_config_tests()},
59-
{debug_calls, [parallel], debug_calls_tests()},
60-
{pubsub_item_publisher_option, [parallel], pubsub_item_publisher_option_tests()},
59+
{collection_config, parallel_props(), collection_config_tests()},
60+
{debug_calls, parallel_props(), debug_calls_tests()},
61+
{pubsub_item_publisher_option, parallel_props(), pubsub_item_publisher_option_tests()},
6162
{hometree_specific, [sequence], hometree_specific_tests()},
62-
{last_item_cache, [parallel], last_item_cache_tests()}].
63+
{last_item_cache, parallel_props(), last_item_cache_tests()}].
64+
65+
parallel_props() ->
66+
case rpc(mim(), mongoose_rdbms, db_engine, [host_type()]) of
67+
cockroachdb ->
68+
%% Parallel pubsub tests are flaky on CockroachDB
69+
[parallel, {repeat_until_all_ok, 5}];
70+
_ ->
71+
[parallel]
72+
end.
6373

6474
basic_tests() ->
6575
[

big_tests/tests/rdbms_SUITE.erl

Lines changed: 89 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ rdbms_queries_cases() ->
8282
test_request_transaction,
8383
test_restart_transaction_with_execute,
8484
test_restart_transaction_with_execute_eventually_passes,
85+
prepare_without_execute_does_not_cause_inconsistency,
8586
test_failed_transaction_with_execute_wrapped,
8687
test_failed_wrapper_transaction,
8788
test_incremental_upsert,
@@ -104,8 +105,7 @@ init_per_suite(Config) ->
104105
orelse mongoose_helper:is_rdbms_enabled(host_type()) of
105106
false -> {skip, rdbms_or_ct_not_running};
106107
true ->
107-
%% Warning: inject_module does not really work well with --rerun-big-tests flag
108-
mongoose_helper:inject_module(?MODULE),
108+
mongoose_helper:inject_module(?MODULE, reload),
109109
Config1 = mongoose_helper:backup_and_set_config_option(Config, [instrumentation, probe_interval], 1),
110110
escalus:init_per_suite(Config1)
111111
end.
@@ -115,15 +115,15 @@ end_per_suite(Config) ->
115115
escalus:end_per_suite(Config).
116116

117117
init_per_group(tagged_rdbms_queries, Config) ->
118-
ExtraConfig = stop_global_default_pool(),
119-
instrument_helper:start(declared_events(tagged_rdbms_queries)),
118+
ExtraConfig = stop_global_default_pool() ++ [{scope, host_type()}, {tag, tag()}],
119+
instrument_helper:start(declared_events(ExtraConfig)),
120120
start_local_host_type_pool(ExtraConfig),
121121
ExtraConfig ++ Config;
122122
init_per_group(global_rdbms_queries, Config) ->
123-
ExtraConfig = stop_global_default_pool(),
124-
instrument_helper:start(declared_events(global_rdbms_queries)),
123+
ExtraConfig = stop_global_default_pool() ++ [{scope, global}, {tag, default}],
124+
instrument_helper:start(declared_events(ExtraConfig)),
125125
restart_global_default_pool(ExtraConfig),
126-
[{tag, global} | Config].
126+
ExtraConfig ++ Config.
127127

128128
end_per_group(tagged_rdbms_queries, Config) ->
129129
stop_local_host_type_pool(),
@@ -136,6 +136,14 @@ end_per_group(global_rdbms_queries, Config) ->
136136
init_per_testcase(test_incremental_upsert, Config) ->
137137
erase_inbox(Config),
138138
escalus:init_per_testcase(test_incremental_upsert, Config);
139+
init_per_testcase(Case = prepare_without_execute_does_not_cause_inconsistency, Config) ->
140+
case is_pgsql() orelse is_cockroachdb() of
141+
true ->
142+
erase_inbox(Config),
143+
escalus:init_per_testcase(Case, Config);
144+
false ->
145+
{skip, "Test for a Postgres-specific issue"}
146+
end;
139147
init_per_testcase(CaseName, Config) ->
140148
escalus:init_per_testcase(CaseName, Config).
141149

@@ -147,18 +155,25 @@ end_per_testcase(CaseName, Config)
147155
CaseName =:= test_failed_wrapper_transaction ->
148156
rpc(mim(), meck, unload, []),
149157
escalus:end_per_testcase(CaseName, Config);
158+
end_per_testcase(Case = prepare_without_execute_does_not_cause_inconsistency, Config) ->
159+
erase_inbox(Config),
160+
rpc(mim(), meck, unload, []),
161+
escalus:end_per_testcase(Case, Config);
150162
end_per_testcase(test_incremental_upsert, Config) ->
151163
erase_inbox(Config),
152164
escalus:end_per_testcase(test_incremental_upsert, Config);
165+
end_per_testcase(Case = test_wrapped_request, Config) ->
166+
Tag = ?config(tag, Config),
167+
ok = rpc(mim(), mongoose_instrument, tear_down, [wrapper_event_spec(Tag)]),
168+
escalus:end_per_testcase(Case, Config);
153169
end_per_testcase(CaseName, Config) ->
154170
escalus:end_per_testcase(CaseName, Config).
155171

156-
declared_events(tagged_rdbms_queries) ->
157-
instrument_helper:declared_events(mongoose_wpool_rdbms, [host_type(), tag()]) ++
158-
[{test_wrapped_request, #{pool_tag => tag()}}];
159-
declared_events(global_rdbms_queries) ->
160-
instrument_helper:declared_events(mongoose_wpool_rdbms, [global, default]) ++
161-
[{test_wrapped_request, #{pool_tag => global}}].
172+
declared_events(Config) ->
173+
Scope = ?config(scope, Config),
174+
Tag = ?config(tag, Config),
175+
instrument_helper:declared_events(mongoose_wpool_rdbms, [Scope, Tag]) ++
176+
[{test_wrapped_request, #{pool_tag => Tag}}].
162177

163178
%%--------------------------------------------------------------------
164179
%% Data for cases
@@ -369,11 +384,11 @@ read_prep_boolean_case(Config) ->
369384

370385
select_current_timestamp_case(Config) ->
371386
ok = rpc(mim(), mongoose_rdbms_timestamp, prepare, []),
372-
Res = case ?config(tag, Config) of
373-
global ->
387+
Res = case {?config(scope, Config), ?config(tag, Config)} of
388+
{global, default} ->
374389
rpc(mim(), mongoose_rdbms_timestamp, select, []);
375-
Tag ->
376-
rpc(mim(), mongoose_rdbms_timestamp, select, [host_type(), Tag])
390+
{Scope, Tag} ->
391+
rpc(mim(), mongoose_rdbms_timestamp, select, [Scope, Tag])
377392
end,
378393
assert_is_integer(Res).
379394

@@ -540,6 +555,31 @@ test_restart_transaction_with_execute_eventually_passes(Config) ->
540555
called_times(3),
541556
ok.
542557

558+
prepare_without_execute_does_not_cause_inconsistency(Config) ->
559+
prepare_insert_int8(Config),
560+
Args = [rdbms, ?config(scope, Config), ?config(tag, Config)],
561+
Pool = rpc(mim(), mongoose_wpool, make_pool_name, Args),
562+
[Key1, Key2] = make_hash_keys(2, Pool),
563+
564+
%% Simulate a call to 'prepare' with missing subsequent 'execute'
565+
ok = rpc(mim(), meck, new, [mongoose_rdbms_backend, [passthrough, no_link]]),
566+
ok = rpc(mim(), meck, expect, [mongoose_rdbms_backend, execute, 4, ok]),
567+
ok = call_worker(Args, Key1, {sql_execute, insert_int8, [1]}),
568+
569+
%% Insert the data using a different worker
570+
Insert = <<"INSERT INTO inbox VALUES ('alice', 'localhost', 'bob@localhost', "
571+
"'msg-id', 'inbox', 'content', 14, 0, 0)">>,
572+
{updated, 1} = call_worker(Args, Key2, {sql_query, Insert}),
573+
574+
%% Make sure that worker 1 is not stuck in the previous transaction after 'prepare'
575+
{selected, [{<<"14">>}]} = call_worker(Args, Key1, {sql_query, <<"SELECT timestamp FROM inbox">>}).
576+
577+
%% Send the SQL command to a particular DB worker
578+
call_worker(Args, HashKey, Operation) ->
579+
TS = rpc(mim(), erlang, monotonic_time, [millisecond]),
580+
Command = {sql_cmd, Operation, TS},
581+
rpc(mim(), mongoose_wpool, call, Args ++ [HashKey, Command]).
582+
543583
test_failed_transaction_with_execute_wrapped(Config) ->
544584
% given
545585
HostType = host_type(),
@@ -687,11 +727,11 @@ test_upsert_many2_replaces_existing(Config) ->
687727

688728
pool_probe_metrics_are_updated(Config) ->
689729
Tag = ?config(tag, Config),
690-
{Event, Labels} = case Tag of
730+
{Event, Labels} = case ?config(scope, Config) of
691731
global ->
692-
{wpool_global_rdbms_stats, #{pool_tag => default}};
693-
Tag ->
694-
{wpool_rdbms_stats, #{host_type => host_type(), pool_tag => Tag}}
732+
{wpool_global_rdbms_stats, #{pool_tag => Tag}};
733+
Scope ->
734+
{wpool_rdbms_stats, #{host_type => Scope, pool_tag => Tag}}
695735
end,
696736
#{recv_oct := Recv, send_oct := Send} = rpc(mim(), mongoose_wpool_rdbms, probe, [Event, Labels]),
697737

@@ -718,10 +758,12 @@ tag() ->
718758
extra_tag.
719759

720760
scope_and_tag(Config) ->
721-
case ?config(tag, Config) of
722-
global -> [host_type()];
723-
Tag -> [host_type(), Tag]
724-
end.
761+
skip_default_tag([?config(scope, Config), ?config(tag, Config)]).
762+
763+
skip_default_tag([Scope, default]) ->
764+
[Scope];
765+
skip_default_tag(ScopeAndTag) ->
766+
ScopeAndTag.
725767

726768
sql_query(Config, Query) ->
727769
ScopeAndTag = scope_and_tag(Config),
@@ -1285,7 +1327,7 @@ stop_global_default_pool() ->
12851327
[GlobalRdbmsPool] = [Pool || Pool = #{type := rdbms, scope := global, tag := default} <- Pools],
12861328
ok = rpc(mim(), mongoose_wpool, stop, [rdbms, global, default]),
12871329
Extra = maybe_stop_service_domain_db(),
1288-
[{tag, tag()}, {global_default_rdbms_pool, GlobalRdbmsPool} | Extra].
1330+
[{global_default_rdbms_pool, GlobalRdbmsPool} | Extra].
12891331

12901332
restart_global_default_pool(Config) ->
12911333
GlobalRdbmsPool = ?config(global_default_rdbms_pool, Config),
@@ -1390,3 +1432,24 @@ check_like_not_matching_prep(SelName, Config, _TextValue, NotMatching, Info) ->
13901432
SelectResult,
13911433
Info#{pattern => NotMatching,
13921434
select_result => SelectResult}).
1435+
1436+
%% Generate a list of Num numerical keys resolving to different Pool workers using hash_worker
1437+
make_hash_keys(Num, Pool) ->
1438+
Workers = rpc(mim(), wpool_pool, get_workers, [Pool]),
1439+
if Num =< length(Workers) ->
1440+
lists:reverse(make_hash_keys(Num, Pool, Workers, 1, []));
1441+
true ->
1442+
ct:fail("Not enough workers in ~p (needed: ~p, actual: ~p)",
1443+
[Pool, Num, length(Workers)])
1444+
end.
1445+
1446+
make_hash_keys(RemainingNum, Pool, Workers, Key, Acc) when RemainingNum > 0 ->
1447+
Worker = rpc(mim(), wpool_pool, hash_worker, [Pool, Key]),
1448+
case lists:member(Worker, Workers) of
1449+
true ->
1450+
make_hash_keys(RemainingNum - 1, Pool, Workers -- [Worker], Key + 1, [Key | Acc]);
1451+
false ->
1452+
make_hash_keys(RemainingNum, Pool, Workers, Key + 1, Acc)
1453+
end;
1454+
make_hash_keys(0, _WorkerNum, _Workers, _Key, Acc) ->
1455+
Acc.

src/rdbms/mongoose_rdbms_pgsql.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,15 @@ prepare(Connection, Name, _Table, _Fields, Statement) ->
7070
BinName = [atom_to_binary(Name, latin1)],
7171
ReplacedStatement = replace_question_marks(Statement),
7272
case epgsql:parse(Connection, BinName, ReplacedStatement, []) of
73-
{ok, _} -> epgsql:describe(Connection, statement, BinName);
74-
Error -> Error
73+
{ok, PreparedStatement} ->
74+
%% If the statement is not executed for some reason, the transaction would remain open.
75+
%% The safest way to fix this is to issue the Sync message.
76+
%% Performance impact is minimal, because 'prepare' is called only once.
77+
%% See https://www.postgresql.org/docs/current/protocol-flow.html for details
78+
ok = epgsql:sync(Connection),
79+
{ok, PreparedStatement};
80+
{error, Reason} ->
81+
{error, Reason}
7582
end.
7683

7784
-spec execute(Connection :: term(), StatementRef :: term(), Params :: [term()],

0 commit comments

Comments
 (0)