Skip to content

Commit 0b82465

Browse files
authored
Detect missing database and recreate & Fix enum conversion uppercase to lowercase etc. (#121)
1 parent 7ce1316 commit 0b82465

File tree

8 files changed

+435
-112
lines changed

8 files changed

+435
-112
lines changed

mysql_ch_replicator/converter.py

Lines changed: 21 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList
77

88
from .table_structure import TableStructure, TableField
9-
from .converter_enum_parser import parse_mysql_enum
9+
from .enum import (
10+
parse_mysql_enum, EnumConverter,
11+
parse_enum_or_set_field,
12+
extract_enum_or_set_values
13+
)
1014

1115

1216
CHARSET_MYSQL_TO_PYTHON = {
@@ -282,7 +286,7 @@ def convert_type(self, mysql_type, parameters):
282286
enum_values = parse_mysql_enum(mysql_type)
283287
ch_enum_values = []
284288
for idx, value_name in enumerate(enum_values):
285-
ch_enum_values.append(f"'{value_name}' = {idx+1}")
289+
ch_enum_values.append(f"'{value_name.lower()}' = {idx+1}")
286290
ch_enum_values = ', '.join(ch_enum_values)
287291
if len(enum_values) <= 127:
288292
# Enum8('red' = 1, 'green' = 2, 'black' = 3)
@@ -428,9 +432,15 @@ def convert_record(
428432
if mysql_field_type.startswith('point'):
429433
clickhouse_field_value = parse_mysql_point(clickhouse_field_value)
430434

431-
if mysql_field_type.startswith('enum(') and isinstance(clickhouse_field_value, int):
435+
if mysql_field_type.startswith('enum('):
432436
enum_values = mysql_structure.fields[idx].additional_data
433-
clickhouse_field_value = enum_values[int(clickhouse_field_value)-1]
437+
field_name = mysql_structure.fields[idx].name if idx < len(mysql_structure.fields) else "unknown"
438+
439+
clickhouse_field_value = EnumConverter.convert_mysql_to_clickhouse_enum(
440+
clickhouse_field_value,
441+
enum_values,
442+
field_name
443+
)
434444

435445
clickhouse_record.append(clickhouse_field_value)
436446
return tuple(clickhouse_record)
@@ -834,107 +844,16 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
834844
end_pos = line.find('`', 1)
835845
field_name = line[1:end_pos]
836846
line = line[end_pos + 1 :].strip()
837-
# Don't split by space for enum and set types that might contain spaces
838-
if line.lower().startswith('enum(') or line.lower().startswith('set('):
839-
# Find the end of the enum/set definition (closing parenthesis)
840-
open_parens = 0
841-
in_quotes = False
842-
quote_char = None
843-
end_pos = -1
844-
845-
for i, char in enumerate(line):
846-
if char in "'\"" and (i == 0 or line[i - 1] != "\\"):
847-
if not in_quotes:
848-
in_quotes = True
849-
quote_char = char
850-
elif char == quote_char:
851-
in_quotes = False
852-
elif char == '(' and not in_quotes:
853-
open_parens += 1
854-
elif char == ')' and not in_quotes:
855-
open_parens -= 1
856-
if open_parens == 0:
857-
end_pos = i + 1
858-
break
859-
860-
if end_pos > 0:
861-
field_type = line[:end_pos]
862-
field_parameters = line[end_pos:].strip()
863-
else:
864-
# Fallback to original behavior if we can't find the end
865-
definition = line.split(' ')
866-
field_type = definition[0]
867-
field_parameters = (
868-
' '.join(definition[1:]) if len(definition) > 1 else ''
869-
)
870-
else:
871-
definition = line.split(' ')
872-
field_type = definition[0]
873-
field_parameters = (
874-
' '.join(definition[1:]) if len(definition) > 1 else ''
875-
)
847+
# Use our new enum parsing utilities
848+
field_name, field_type, field_parameters = parse_enum_or_set_field(line, field_name, is_backtick_quoted=True)
876849
else:
877850
definition = line.split(' ')
878851
field_name = strip_sql_name(definition[0])
879-
definition = definition[1:]
880-
if definition and (
881-
definition[0].lower().startswith('enum(')
882-
or definition[0].lower().startswith('set(')
883-
):
884-
line = ' '.join(definition)
885-
# Find the end of the enum/set definition (closing parenthesis)
886-
open_parens = 0
887-
in_quotes = False
888-
quote_char = None
889-
end_pos = -1
890-
891-
for i, char in enumerate(line):
892-
if char in "'\"" and (i == 0 or line[i - 1] != "\\"):
893-
if not in_quotes:
894-
in_quotes = True
895-
quote_char = char
896-
elif char == quote_char:
897-
in_quotes = False
898-
elif char == '(' and not in_quotes:
899-
open_parens += 1
900-
elif char == ')' and not in_quotes:
901-
open_parens -= 1
902-
if open_parens == 0:
903-
end_pos = i + 1
904-
break
905-
906-
if end_pos > 0:
907-
field_type = line[:end_pos]
908-
field_parameters = line[end_pos:].strip()
909-
else:
910-
# Fallback to original behavior
911-
field_type = definition[0]
912-
field_parameters = (
913-
' '.join(definition[1:]) if len(definition) > 1 else ''
914-
)
915-
else:
916-
field_type = definition[0]
917-
field_parameters = (
918-
' '.join(definition[1:]) if len(definition) > 1 else ''
919-
)
920-
921-
additional_data = None
922-
if 'set(' in field_type.lower():
923-
vals = field_type[len('set('):]
924-
close_pos = vals.find(')')
925-
vals = vals[:close_pos]
926-
vals = vals.split(',')
927-
def vstrip(e):
928-
if not e:
929-
return e
930-
if e[0] in '"\'':
931-
return e[1:-1]
932-
return e
933-
vals = [vstrip(v) for v in vals]
934-
additional_data = vals
935-
936-
if field_type.lower().startswith('enum('):
937-
additional_data = parse_mysql_enum(field_type)
852+
# Use our new enum parsing utilities
853+
field_name, field_type, field_parameters = parse_enum_or_set_field(line, field_name, is_backtick_quoted=False)
854+
855+
# Extract additional data for enum and set types
856+
additional_data = extract_enum_or_set_values(field_type, from_parser_func=parse_mysql_enum)
938857

939858
structure.fields.append(TableField(
940859
name=field_name,

mysql_ch_replicator/db_replicator.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,11 @@ def run(self):
180180

181181
if self.state.status != Status.NONE:
182182
# ensure target database still exists
183-
if self.target_database not in self.clickhouse_api.get_databases():
183+
if self.target_database not in self.clickhouse_api.get_databases() and f"{self.target_database}_tmp" not in self.clickhouse_api.get_databases():
184184
logger.warning(f'database {self.target_database} missing in CH')
185-
if self.initial_only:
186-
logger.warning('will run replication from scratch')
187-
self.state.remove()
188-
self.state = self.create_state()
185+
logger.warning('will run replication from scratch')
186+
self.state.remove()
187+
self.state = self.create_state()
189188

190189
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
191190
self.run_realtime_replication()
@@ -227,6 +226,10 @@ def create_initial_structure_table(self, table_name):
227226
)
228227
self.validate_mysql_structure(mysql_structure)
229228
clickhouse_structure = self.converter.convert_table_structure(mysql_structure)
229+
230+
# Always set if_not_exists to True to prevent errors when tables already exist
231+
clickhouse_structure.if_not_exists = True
232+
230233
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
231234
indexes = self.config.get_indexes(self.database, table_name)
232235
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)

mysql_ch_replicator/enum/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from .parser import parse_mysql_enum, is_enum_type
2+
from .converter import EnumConverter
3+
from .utils import find_enum_definition_end, extract_field_components
4+
from .ddl_parser import (
5+
find_enum_or_set_definition_end,
6+
parse_enum_or_set_field,
7+
extract_enum_or_set_values,
8+
strip_value
9+
)
10+
11+
__all__ = [
12+
'parse_mysql_enum',
13+
'is_enum_type',
14+
'EnumConverter',
15+
'find_enum_definition_end',
16+
'extract_field_components',
17+
'find_enum_or_set_definition_end',
18+
'parse_enum_or_set_field',
19+
'extract_enum_or_set_values',
20+
'strip_value'
21+
]

mysql_ch_replicator/enum/converter.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from typing import List, Union, Optional, Any
2+
from logging import getLogger
3+
4+
# Create a single module-level logger
5+
logger = getLogger(__name__)
6+
7+
class EnumConverter:
8+
"""Class to handle conversion of enum values between MySQL and ClickHouse"""
9+
10+
@staticmethod
11+
def convert_mysql_to_clickhouse_enum(
12+
value: Any,
13+
enum_values: List[str],
14+
field_name: str = "unknown"
15+
) -> Optional[Union[str, int]]:
16+
"""
17+
Convert a MySQL enum value to the appropriate ClickHouse representation
18+
19+
Args:
20+
value: The MySQL enum value (can be int, str, None)
21+
enum_values: List of possible enum string values
22+
field_name: Name of the field (for better error reporting)
23+
24+
Returns:
25+
The properly converted enum value for ClickHouse
26+
"""
27+
# Handle NULL values
28+
if value is None:
29+
return None
30+
31+
# Handle integer values (index-based)
32+
if isinstance(value, int):
33+
# Check if the value is 0
34+
if value == 0:
35+
# Return 0 as-is - let ClickHouse handle it according to the field's nullability
36+
logger.debug(f"ENUM CONVERSION: Found enum index 0 for field '{field_name}'. Keeping as 0.")
37+
return 0
38+
39+
# Validate that the enum index is within range
40+
if value < 1 or value > len(enum_values):
41+
# Log the issue
42+
logger.error(f"ENUM CONVERSION: Invalid enum index {value} for field '{field_name}' "
43+
f"with values {enum_values}")
44+
# Return the value unchanged
45+
return value
46+
else:
47+
# Convert to the string representation (lowercase to match our new convention)
48+
return enum_values[int(value)-1].lower()
49+
50+
# Handle string values
51+
elif isinstance(value, str):
52+
# Validate that the string value exists in enum values
53+
# First check case-sensitive, then case-insensitive
54+
if value in enum_values:
55+
return value.lower()
56+
57+
# Try case-insensitive match
58+
lowercase_enum_values = [v.lower() for v in enum_values]
59+
if value.lower() in lowercase_enum_values:
60+
return value.lower()
61+
62+
# Value not found in enum values
63+
logger.error(f"ENUM CONVERSION: Invalid enum value '{value}' not in {enum_values} "
64+
f"for field '{field_name}'")
65+
# Return the value unchanged
66+
return value
67+
68+
# Handle any other unexpected types
69+
else:
70+
logger.error(f"ENUM CONVERSION: Unexpected type {type(value)} for enum field '{field_name}'")
71+
# Return the value unchanged
72+
return value

0 commit comments

Comments
 (0)