Skip to content

Commit 1c859f9

Browse files
authored
Handle numeric data type (#114)
1 parent caac6bf commit 1c859f9

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

mysql_ch_replicator/converter.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,44 @@ def convert_type(self, mysql_type, parameters):
196196
if mysql_type == 'point':
197197
return 'Tuple(x Float32, y Float32)'
198198

199+
# Correctly handle numeric types
200+
if mysql_type.startswith('numeric'):
201+
# Determine if parameters are specified via parentheses:
202+
if '(' in mysql_type and ')' in mysql_type:
203+
# Expecting a type definition like "numeric(precision, scale)"
204+
pattern = r"numeric\((\d+)\s*,\s*(\d+)\)"
205+
match = re.search(pattern, mysql_type)
206+
if not match:
207+
raise ValueError(f"Invalid numeric type definition: {mysql_type}")
208+
209+
precision = int(match.group(1))
210+
scale = int(match.group(2))
211+
else:
212+
# If no parentheses are provided, assume defaults.
213+
precision = 10 # or other default as defined by your standards
214+
scale = 0
215+
216+
# If no fractional part, consider mapping to integer type (if desired)
217+
if scale == 0:
218+
if is_unsigned:
219+
if precision <= 9:
220+
return "UInt32"
221+
elif precision <= 18:
222+
return "UInt64"
223+
else:
224+
# For very large precisions, fallback to Decimal
225+
return f"Decimal({precision}, {scale})"
226+
else:
227+
if precision <= 9:
228+
return "Int32"
229+
elif precision <= 18:
230+
return "Int64"
231+
else:
232+
return f"Decimal({precision}, {scale})"
233+
else:
234+
# For types with a defined fractional part, use a Decimal mapping.
235+
return f"Decimal({precision}, {scale})"
236+
199237
if mysql_type == 'int':
200238
if is_unsigned:
201239
return 'UInt32'
@@ -472,7 +510,69 @@ def convert_alter_query(self, mysql_query, db_name):
472510

473511
raise Exception(f'operation {op_name} not implement, query: {subquery}')
474512

513+
@classmethod
514+
def _tokenize_alter_query(cls, sql_line):
515+
# We want to recognize tokens that may be:
516+
# 1. A backquoted identifier that can optionally be immediately followed by parentheses.
517+
# 2. A plain word (letters/digits/underscore) that may immediately be followed by a parenthesized argument list.
518+
# 3. A single-quoted or double-quoted string.
519+
# 4. Or, if nothing else, any non‐whitespace sequence.
520+
#
521+
# The order is important: for example, if a word is immediately followed by parentheses,
522+
# we want to grab it as a single token.
523+
token_pattern = re.compile(r'''
524+
( # start capture group for a token
525+
`[^`]+`(?:\([^)]*\))? | # backquoted identifier w/ optional parentheses
526+
\w+(?:\([^)]*\))? | # a word with optional parentheses
527+
'(?:\\'|[^'])*' | # a single-quoted string
528+
"(?:\\"|[^"])*" | # a double-quoted string
529+
[^\s]+ # fallback: any sequence of non-whitespace characters
530+
)
531+
''', re.VERBOSE)
532+
tokens = token_pattern.findall(sql_line)
533+
534+
# Now, split the column definition into:
535+
# token0 = column name,
536+
# token1 = data type (which might be multiple tokens, e.g. DOUBLE PRECISION, INT UNSIGNED,
537+
# or a word+parentheses like VARCHAR(254) or NUMERIC(5, 2)),
538+
# remaining tokens: the parameters such as DEFAULT, NOT, etc.
539+
#
540+
# We define a set of keywords that indicate the start of column options.
541+
constraint_keywords = {
542+
"DEFAULT", "NOT", "NULL", "AUTO_INCREMENT", "PRIMARY", "UNIQUE",
543+
"COMMENT", "COLLATE", "REFERENCES", "ON", "CHECK", "CONSTRAINT",
544+
"AFTER", "BEFORE", "GENERATED", "VIRTUAL", "STORED", "FIRST",
545+
"ALWAYS", "AS", "IDENTITY", "INVISIBLE", "PERSISTED",
546+
}
547+
548+
if not tokens:
549+
return tokens
550+
# The first token is always the column name.
551+
column_name = tokens[0]
552+
553+
# Now “merge” tokens after the column name that belong to the type.
554+
# (For many types the type is written as a single token already –
555+
# e.g. "VARCHAR(254)" or "NUMERIC(5, 2)", but for types like
556+
# "DOUBLE PRECISION" or "INT UNSIGNED" the .split() would produce two tokens.)
557+
type_tokens = []
558+
i = 1
559+
while i < len(tokens) and tokens[i].upper() not in constraint_keywords:
560+
type_tokens.append(tokens[i])
561+
i += 1
562+
merged_type = " ".join(type_tokens) if type_tokens else ""
563+
564+
# The remaining tokens are passed through unchanged.
565+
param_tokens = tokens[i:]
566+
567+
# Result: [column name, merged type, all the rest]
568+
if merged_type:
569+
return [column_name, merged_type] + param_tokens
570+
else:
571+
return [column_name] + param_tokens
572+
475573
def __convert_alter_table_add_column(self, db_name, table_name, tokens):
574+
tokens = self._tokenize_alter_query(' '.join(tokens))
575+
476576
if len(tokens) < 2:
477577
raise Exception('wrong tokens count', tokens)
478578

test_mysql_ch_replicator.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import subprocess
66
import json
77
import uuid
8+
import decimal
89

910
import pytest
1011
import requests
@@ -276,6 +277,12 @@ def test_e2e_multistatement():
276277
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE name='Ivan';", commit=True)
277278
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
278279

280+
mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD factor NUMERIC(5, 2) DEFAULT NULL;")
281+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, factor) VALUES ('Snow', 31, 13.29);", commit=True)
282+
283+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
284+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Snow'")[0].get('factor') == decimal.Decimal('13.29'))
285+
279286
mysql.execute(
280287
f"CREATE TABLE {TEST_TABLE_NAME_2} "
281288
f"(id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, "
@@ -1493,3 +1500,38 @@ def _get_last_insert_name():
14931500
print("*****************************")
14941501
print('\n\n')
14951502

1503+
1504+
def test_alter_tokens_split():
1505+
examples = [
1506+
# basic examples from the prompt:
1507+
("test_name VARCHAR(254) NULL", ["test_name", "VARCHAR(254)", "NULL"]),
1508+
("factor NUMERIC(5, 2) DEFAULT NULL", ["factor", "NUMERIC(5, 2)", "DEFAULT", "NULL"]),
1509+
# backquoted column name:
1510+
("`test_name` VARCHAR(254) NULL", ["`test_name`", "VARCHAR(254)", "NULL"]),
1511+
("`order` INT NOT NULL", ["`order`", "INT", "NOT", "NULL"]),
1512+
# type that contains a parenthesized list with quoted values:
1513+
("status ENUM('active','inactive') DEFAULT 'active'",
1514+
["status", "ENUM('active','inactive')", "DEFAULT", "'active'"]),
1515+
# multi‐word type definitions:
1516+
("col DOUBLE PRECISION DEFAULT 0", ["col", "DOUBLE PRECISION", "DEFAULT", "0"]),
1517+
("col INT UNSIGNED DEFAULT 0", ["col", "INT UNSIGNED", "DEFAULT", "0"]),
1518+
# a case with a quoted string containing spaces and punctuation:
1519+
("message VARCHAR(100) DEFAULT 'Hello, world!'",
1520+
["message", "VARCHAR(100)", "DEFAULT", "'Hello, world!'"]),
1521+
# longer definition with more options:
1522+
("col DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP",
1523+
["col", "DATETIME", "DEFAULT", "CURRENT_TIMESTAMP", "ON", "UPDATE", "CURRENT_TIMESTAMP"]),
1524+
# type with a COMMENT clause (here the type is given, then a parameter keyword)
1525+
("col VARCHAR(100) COMMENT 'This is a test comment'",
1526+
["col", "VARCHAR(100)", "COMMENT", "'This is a test comment'"]),
1527+
("c1 INT FIRST", ["c1", "INT", "FIRST"]),
1528+
]
1529+
1530+
for sql, expected in examples:
1531+
result = MysqlToClickhouseConverter._tokenize_alter_query(sql)
1532+
print("SQL Input: ", sql)
1533+
print("Expected: ", expected)
1534+
print("Tokenized: ", result)
1535+
print("Match? ", result == expected)
1536+
print("-" * 60)
1537+
assert result == expected

0 commit comments

Comments
 (0)