Skip to content

Commit 2d885ce

Browse files
authored
Fixed resume of initial replication with ignore_deletes option (#173)
1 parent cdc044e commit 2d885ce

File tree

6 files changed

+195
-24
lines changed

6 files changed

+195
-24
lines changed

mysql_ch_replicator/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class Settings:
107107
DEFAULT_OPTIMIZE_INTERVAL = 86400
108108
DEFAULT_CHECK_DB_UPDATED_INTERVAL = 120
109109
DEFAULT_AUTO_RESTART_INTERVAL = 3600
110+
DEFAULT_INITIAL_REPLICATION_BATCH_SIZE = 50000
110111

111112
def __init__(self):
112113
self.mysql = MysqlSettings()
@@ -131,6 +132,7 @@ def __init__(self):
131132
self.initial_replication_threads = 0
132133
self.ignore_deletes = False
133134
self.mysql_timezone = 'UTC'
135+
self.initial_replication_batch_size = 50000
134136

135137
def load(self, settings_file):
136138
data = open(settings_file, 'r').read()
@@ -158,6 +160,7 @@ def load(self, settings_file):
158160
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
159161
self.ignore_deletes = data.pop('ignore_deletes', False)
160162
self.mysql_timezone = data.pop('mysql_timezone', 'UTC')
163+
self.initial_replication_batch_size = data.pop('initial_replication_batch_size', Settings.DEFAULT_INITIAL_REPLICATION_BATCH_SIZE)
161164

162165
indexes = data.pop('indexes', [])
163166
for index in indexes:

mysql_ch_replicator/db_replicator.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,14 @@ def remove(self):
8888

8989
class DbReplicator:
9090
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False,
91-
worker_id: int = None, total_workers: int = None, table: str = None):
91+
worker_id: int = None, total_workers: int = None, table: str = None, initial_replication_test_fail_records: int = None):
9292
self.config = config
9393
self.database = database
9494
self.worker_id = worker_id
9595
self.total_workers = total_workers
9696
self.settings_file = config.settings_file
9797
self.single_table = table # Store the single table to process
98+
self.initial_replication_test_fail_records = initial_replication_test_fail_records # Test flag for early exit
9899

99100
# use same as source database by default
100101
self.target_database = database
@@ -143,6 +144,11 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
143144
self.target_database_tmp = self.target_database + '_tmp'
144145
if self.is_parallel_worker:
145146
self.target_database_tmp = self.target_database
147+
148+
# If ignore_deletes is enabled, we replicate directly into the target DB
149+
# This must be set here to ensure consistency between first run and resume
150+
if self.config.ignore_deletes:
151+
self.target_database_tmp = self.target_database
146152

147153
self.mysql_api = MySQLApi(
148154
database=self.database,
@@ -205,7 +211,6 @@ def run(self):
205211
if self.config.ignore_deletes:
206212
logger.info(f'using existing database (ignore_deletes=True)')
207213
self.clickhouse_api.database = self.target_database
208-
self.target_database_tmp = self.target_database
209214

210215
# Create database if it doesn't exist
211216
if self.target_database not in self.clickhouse_api.get_databases():

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
class DbReplicatorInitial:
2222

23-
INITIAL_REPLICATION_BATCH_SIZE = 50000
2423
SAVE_STATE_INTERVAL = 10
2524
BINLOG_TOUCH_INTERVAL = 120
2625

@@ -180,7 +179,7 @@ def perform_initial_replication_table(self, table_name):
180179
records = self.replicator.mysql_api.get_records(
181180
table_name=table_name,
182181
order_by=primary_keys,
183-
limit=self.INITIAL_REPLICATION_BATCH_SIZE,
182+
limit=self.replicator.config.initial_replication_batch_size,
184183
start_value=query_start_values,
185184
worker_id=self.replicator.worker_id,
186185
total_workers=self.replicator.total_workers,
@@ -207,6 +206,16 @@ def perform_initial_replication_table(self, table_name):
207206
self.prevent_binlog_removal()
208207

209208
stats_number_of_records += len(records)
209+
210+
# Test flag: Exit early if we've replicated enough records for testing
211+
if (self.replicator.initial_replication_test_fail_records is not None and
212+
stats_number_of_records >= self.replicator.initial_replication_test_fail_records):
213+
logger.info(
214+
f'TEST MODE: Exiting initial replication after {stats_number_of_records} records '
215+
f'(limit: {self.replicator.initial_replication_test_fail_records})'
216+
)
217+
return
218+
210219
curr_time = time.time()
211220
if curr_time - last_stats_dump_time >= 60.0:
212221
last_stats_dump_time = curr_time

mysql_ch_replicator/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def run_db_replicator(args, config: Settings):
109109
worker_id=args.worker_id,
110110
total_workers=args.total_workers,
111111
table=args.table,
112+
initial_replication_test_fail_records=getattr(args, 'initial_replication_test_fail_records', None),
112113
)
113114
db_replicator.run()
114115

@@ -169,6 +170,10 @@ def main():
169170
"--table", type=str, default=None,
170171
help="Specific table to process (used with --worker_id for parallel processing of a single table)",
171172
)
173+
parser.add_argument(
174+
"--initial-replication-test-fail-records", type=int, default=None,
175+
help="FOR TESTING ONLY: Exit initial replication after processing this many records",
176+
)
172177
args = parser.parse_args()
173178

174179
config = Settings()

test_mysql_ch_replicator.py

Lines changed: 133 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,10 +1179,8 @@ def test_json():
11791179

11801180

11811181
def test_string_primary_key(monkeypatch):
1182-
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1183-
11841182
cfg = config.Settings()
1185-
cfg.load(CONFIG_FILE)
1183+
cfg.load('tests_config_string_primary_key.yaml')
11861184

11871185
mysql = mysql_api.MySQLApi(
11881186
database=None,
@@ -1217,9 +1215,9 @@ def test_string_primary_key(monkeypatch):
12171215
commit=True,
12181216
)
12191217

1220-
binlog_replicator_runner = BinlogReplicatorRunner()
1218+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file='tests_config_string_primary_key.yaml')
12211219
binlog_replicator_runner.run()
1222-
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
1220+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file='tests_config_string_primary_key.yaml')
12231221
db_replicator_runner.run()
12241222

12251223
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
@@ -1241,10 +1239,8 @@ def test_string_primary_key(monkeypatch):
12411239

12421240

12431241
def test_if_exists_if_not_exists(monkeypatch):
1244-
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1245-
12461242
cfg = config.Settings()
1247-
cfg.load(CONFIG_FILE)
1243+
cfg.load('tests_config_string_primary_key.yaml')
12481244

12491245
mysql = mysql_api.MySQLApi(
12501246
database=None,
@@ -1258,9 +1254,9 @@ def test_if_exists_if_not_exists(monkeypatch):
12581254

12591255
prepare_env(cfg, mysql, ch)
12601256

1261-
binlog_replicator_runner = BinlogReplicatorRunner()
1257+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file='tests_config_string_primary_key.yaml')
12621258
binlog_replicator_runner.run()
1263-
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
1259+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file='tests_config_string_primary_key.yaml')
12641260
db_replicator_runner.run()
12651261

12661262
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
@@ -1282,10 +1278,8 @@ def test_if_exists_if_not_exists(monkeypatch):
12821278

12831279

12841280
def test_percona_migration(monkeypatch):
1285-
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1286-
12871281
cfg = config.Settings()
1288-
cfg.load(CONFIG_FILE)
1282+
cfg.load('tests_config_string_primary_key.yaml')
12891283

12901284
mysql = mysql_api.MySQLApi(
12911285
database=None,
@@ -1310,9 +1304,9 @@ def test_percona_migration(monkeypatch):
13101304
commit=True,
13111305
)
13121306

1313-
binlog_replicator_runner = BinlogReplicatorRunner()
1307+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file='tests_config_string_primary_key.yaml')
13141308
binlog_replicator_runner.run()
1315-
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
1309+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file='tests_config_string_primary_key.yaml')
13161310
db_replicator_runner.run()
13171311

13181312
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
@@ -1360,10 +1354,8 @@ def test_percona_migration(monkeypatch):
13601354

13611355

13621356
def test_add_column_first_after_and_drop_column(monkeypatch):
1363-
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1364-
13651357
cfg = config.Settings()
1366-
cfg.load(CONFIG_FILE)
1358+
cfg.load('tests_config_string_primary_key.yaml')
13671359

13681360
mysql = mysql_api.MySQLApi(
13691361
database=None,
@@ -1388,9 +1380,9 @@ def test_add_column_first_after_and_drop_column(monkeypatch):
13881380
commit=True,
13891381
)
13901382

1391-
binlog_replicator_runner = BinlogReplicatorRunner()
1383+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file='tests_config_string_primary_key.yaml')
13921384
binlog_replicator_runner.run()
1393-
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
1385+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file='tests_config_string_primary_key.yaml')
13941386
db_replicator_runner.run()
13951387

13961388
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
@@ -2912,3 +2904,124 @@ def test_timezone_conversion():
29122904
finally:
29132905
# Clean up temporary config file
29142906
os.unlink(temp_config_file)
2907+
2908+
def test_resume_initial_replication_with_ignore_deletes():
2909+
"""
2910+
Test that resuming initial replication works correctly with ignore_deletes=True.
2911+
2912+
This reproduces the bug from https://github.com/bakwc/mysql_ch_replicator/issues/172
2913+
where resuming initial replication would fail with "Database sirocco_tmp does not exist"
2914+
when ignore_deletes=True because the code would try to use the _tmp database instead
2915+
of the target database directly.
2916+
"""
2917+
# Create a temporary config file with ignore_deletes=True
2918+
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:
2919+
config_file = temp_config_file.name
2920+
2921+
# Read the original config
2922+
with open(CONFIG_FILE, 'r') as original_config:
2923+
config_data = yaml.safe_load(original_config)
2924+
2925+
# Add ignore_deletes=True
2926+
config_data['ignore_deletes'] = True
2927+
2928+
# Set initial_replication_batch_size to 1 for testing
2929+
config_data['initial_replication_batch_size'] = 1
2930+
2931+
# Write to the temp file
2932+
yaml.dump(config_data, temp_config_file)
2933+
2934+
try:
2935+
cfg = config.Settings()
2936+
cfg.load(config_file)
2937+
2938+
# Verify the ignore_deletes option was set
2939+
assert cfg.ignore_deletes is True
2940+
2941+
mysql = mysql_api.MySQLApi(
2942+
database=None,
2943+
mysql_settings=cfg.mysql,
2944+
)
2945+
2946+
ch = clickhouse_api.ClickhouseApi(
2947+
database=TEST_DB_NAME,
2948+
clickhouse_settings=cfg.clickhouse,
2949+
)
2950+
2951+
prepare_env(cfg, mysql, ch)
2952+
2953+
# Create a table with many records to ensure initial replication takes time
2954+
mysql.execute(f'''
2955+
CREATE TABLE `{TEST_TABLE_NAME}` (
2956+
id int NOT NULL AUTO_INCREMENT,
2957+
name varchar(255),
2958+
data varchar(1000),
2959+
PRIMARY KEY (id)
2960+
)
2961+
''')
2962+
2963+
# Insert many records to make initial replication take longer
2964+
for i in range(100):
2965+
mysql.execute(
2966+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('test_{i}', 'data_{i}');",
2967+
commit=True
2968+
)
2969+
2970+
# Start binlog replicator
2971+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
2972+
binlog_replicator_runner.run()
2973+
2974+
# Start db replicator for initial replication with test flag to exit early
2975+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file,
2976+
additional_arguments='--initial-replication-test-fail-records 30')
2977+
db_replicator_runner.run()
2978+
2979+
# Wait for initial replication to start
2980+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
2981+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
2982+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2983+
2984+
# Wait for some records to be replicated but not all (should hit the 30 record limit)
2985+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) > 0)
2986+
2987+
# The db replicator should have stopped automatically due to the test flag
2988+
# But we still call stop() to ensure proper cleanup
2989+
db_replicator_runner.stop()
2990+
2991+
# Verify the state is still PERFORMING_INITIAL_REPLICATION
2992+
state_path = os.path.join(cfg.binlog_replicator.data_dir, TEST_DB_NAME, 'state.pckl')
2993+
state = DbReplicatorState(state_path)
2994+
assert state.status.value == 2 # PERFORMING_INITIAL_REPLICATION
2995+
2996+
# Add more records while replication is stopped
2997+
for i in range(100, 150):
2998+
mysql.execute(
2999+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('test_{i}', 'data_{i}');",
3000+
commit=True
3001+
)
3002+
3003+
# Verify that sirocco_tmp database does NOT exist (it should use sirocco directly)
3004+
assert f"{TEST_DB_NAME}_tmp" not in ch.get_databases(), "Temporary database should not exist with ignore_deletes=True"
3005+
3006+
# Resume initial replication - this should NOT fail with "Database sirocco_tmp does not exist"
3007+
db_replicator_runner_2 = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
3008+
db_replicator_runner_2.run()
3009+
3010+
# Wait for all records to be replicated (100 original + 50 extra = 150)
3011+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 150, max_wait_time=30)
3012+
3013+
# Verify the replication completed successfully
3014+
records = ch.select(TEST_TABLE_NAME)
3015+
assert len(records) == 150, f"Expected 150 records, got {len(records)}"
3016+
3017+
# Verify we can continue with realtime replication
3018+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES ('realtime_test', 'realtime_data');", commit=True)
3019+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 151)
3020+
3021+
# Clean up
3022+
db_replicator_runner_2.stop()
3023+
binlog_replicator_runner.stop()
3024+
3025+
finally:
3026+
# Clean up temp config file
3027+
os.unlink(config_file)

tests_config_string_primary_key.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
mysql:
2+
host: 'localhost'
3+
port: 9306
4+
user: 'root'
5+
password: 'admin'
6+
7+
clickhouse:
8+
host: 'localhost'
9+
port: 9123
10+
user: 'default'
11+
password: 'admin'
12+
13+
binlog_replicator:
14+
data_dir: '/app/binlog/'
15+
records_per_file: 100000
16+
binlog_retention_period: 43200 # 12 hours in seconds
17+
18+
databases: '*test*'
19+
log_level: 'debug'
20+
optimize_interval: 3
21+
check_db_updated_interval: 3
22+
initial_replication_batch_size: 1
23+
24+
target_databases:
25+
replication-test_db_2: replication-destination
26+
27+
indexes:
28+
- databases: '*'
29+
tables: ['group']
30+
index: 'INDEX name_idx name TYPE ngrambf_v1(5, 65536, 4, 0) GRANULARITY 1'
31+
32+
http_host: 'localhost'
33+
http_port: 9128
34+
35+
types_mapping:
36+
'char(36)': 'UUID'

0 commit comments

Comments
 (0)