Skip to content

Commit 566057a

Browse files
authored
Added option to cast some columns to type expected by Tarantool (#24)
1 parent 2bc7b2c commit 566057a

File tree

11 files changed

+213
-45
lines changed

11 files changed

+213
-45
lines changed

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,34 @@ Replicator reads primary keys from MySQL table info and sync them automatically.
3838
Updating primary key in MySQL causes two Tarantool requests: delete an old row and insert a new one, because
3939
it is illegal to update primary key in Tarantool.
4040

41+
### Force cast MySQL value
42+
43+
Replicator can cast the value from MySQL to the required type
44+
if your Tarantool schema does not comply with the MySQL schema.
45+
For example, MySQL column stores `bigint(20)` values, but Tarantool
46+
expects `unsigned`.
47+
Without explicit casting you get an error, e.g.:
48+
> Tuple field 1 type does not match one required by operation: expected unsigned
49+
50+
Supported types to cast to:
51+
* `unsigned`: try to cast any number to unsigned value.
52+
53+
To enable this feature specify which column should be casted:
54+
55+
```yaml
56+
...
57+
mappings:
58+
- source:
59+
schema: 'city'
60+
table: 'users'
61+
columns:
62+
- client_id
63+
dest:
64+
space: 'users'
65+
cast:
66+
client_id: 'unsigned'
67+
```
68+
4169
## Docker image
4270
4371
Image available at [Docker Hub](https://hub.docker.com/r/pparshin/go-mysql-tarantool).

docker/mysql/init.d/init.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ CREATE TABLE logins
1414
username varchar(16) not null,
1515
ip varchar(16) not null,
1616
date int unsigned not null,
17-
attempts int unsigned default 0,
17+
attempts bigint(20) default 0,
1818
longitude float unsigned default 0,
1919
latitude float unsigned default 0,
2020

internal/bridge/attribute.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,29 @@ const (
2727
typePoint // coordinates
2828
)
2929

30+
type castType int
31+
32+
const (
33+
castNone castType = iota // do not cast
34+
castUnsigned // unsigned
35+
)
36+
37+
func castTypeFromString(str string) castType {
38+
switch str {
39+
case "unsigned":
40+
return castUnsigned
41+
default:
42+
return castNone
43+
}
44+
}
45+
3046
// attribute represents MySQL column mapped to Tarantool.
3147
type attribute struct {
3248
colIndex uint64 // column sequence number in MySQL table
3349
tupIndex uint64 // attribute sequence number in Tarantool tuple
3450
name string // unique attribute name
35-
vtype attrType // value type stored in the column
51+
vType attrType // value type stored in the column
52+
cType castType // value must be casted to this type
3653
unsigned bool // whether attribute contains unsigned number or not
3754
}
3855

@@ -48,7 +65,8 @@ func newAttr(table *schema.Table, tupIndex uint64, name string) (*attribute, err
4865
colIndex: uint64(idx),
4966
tupIndex: tupIndex,
5067
name: col.Name,
51-
vtype: attrType(col.Type),
68+
vType: attrType(col.Type),
69+
cType: castNone,
5270
unsigned: col.IsUnsigned,
5371
}, nil
5472
}
@@ -61,13 +79,19 @@ func newAttrsFromPKs(table *schema.Table) []*attribute {
6179
colIndex: uint64(pki),
6280
tupIndex: uint64(i),
6381
name: col.Name,
82+
vType: attrType(col.Type),
83+
cType: castNone,
6484
unsigned: col.IsUnsigned,
6585
})
6686
}
6787

6888
return pks
6989
}
7090

91+
func (a *attribute) castTo(t castType) {
92+
a.cType = t
93+
}
94+
7195
func (a *attribute) fetchValue(row []interface{}) (interface{}, error) {
7296
if a.colIndex >= uint64(len(row)) {
7397
return nil, fmt.Errorf("column index (%d) equals or greater than row length (%d)", a.colIndex, len(row))
@@ -76,7 +100,12 @@ func (a *attribute) fetchValue(row []interface{}) (interface{}, error) {
76100
value := row[a.colIndex]
77101

78102
if a.shouldCastToUInt64(value) {
79-
return toUint64(value)
103+
v, err := toUint64(value)
104+
if err != nil {
105+
return nil, err
106+
}
107+
108+
return v, nil
80109
}
81110

82111
return value, nil
@@ -87,11 +116,15 @@ func (a *attribute) shouldCastToUInt64(value interface{}) bool {
87116
return false
88117
}
89118

119+
if a.cType == castUnsigned {
120+
return true
121+
}
122+
90123
if !a.unsigned {
91124
return false
92125
}
93126

94-
return a.vtype == typeNumber || a.vtype == typeMediumInt
127+
return a.vType == typeNumber || a.vType == typeMediumInt
95128
}
96129

97130
func toUint64(i interface{}) (uint64, error) {

internal/bridge/attribute_test.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ func Test_attribute_fetchValue(t *testing.T) {
1111
colIndex uint64
1212
tupIndex uint64
1313
name string
14-
vtype attrType
14+
vType attrType
15+
cType castType
1516
unsigned bool
1617
}
1718
type args struct {
@@ -30,7 +31,8 @@ func Test_attribute_fetchValue(t *testing.T) {
3031
colIndex: 0,
3132
tupIndex: 0,
3233
name: "name",
33-
vtype: typeString,
34+
vType: typeString,
35+
cType: castNone,
3436
unsigned: false,
3537
},
3638
args: args{
@@ -45,7 +47,8 @@ func Test_attribute_fetchValue(t *testing.T) {
4547
colIndex: 0,
4648
tupIndex: 0,
4749
name: "speed",
48-
vtype: typeNumber,
50+
vType: typeNumber,
51+
cType: castNone,
4952
unsigned: false,
5053
},
5154
args: args{
@@ -54,13 +57,46 @@ func Test_attribute_fetchValue(t *testing.T) {
5457
want: -20,
5558
wantErr: false,
5659
},
60+
{
61+
name: "Number_ForceCastToUnsigned",
62+
fields: fields{
63+
colIndex: 0,
64+
tupIndex: 0,
65+
name: "speed",
66+
vType: typeNumber,
67+
cType: castUnsigned,
68+
unsigned: false,
69+
},
70+
args: args{
71+
row: []interface{}{20},
72+
},
73+
want: uint64(20),
74+
wantErr: false,
75+
},
76+
{
77+
name: "Float_ForceCastToUnsigned_Error",
78+
fields: fields{
79+
colIndex: 0,
80+
tupIndex: 0,
81+
name: "speed",
82+
vType: typeFloat,
83+
cType: castUnsigned,
84+
unsigned: false,
85+
},
86+
args: args{
87+
row: []interface{}{4654.123},
88+
},
89+
want: 0,
90+
wantErr: true,
91+
},
5792
{
5893
name: "UnsignedMediumInt",
5994
fields: fields{
6095
colIndex: 0,
6196
tupIndex: 0,
6297
name: "id",
63-
vtype: typeMediumInt,
98+
vType: typeMediumInt,
99+
cType: castNone,
64100
unsigned: true,
65101
},
66102
args: args{
@@ -75,7 +111,8 @@ func Test_attribute_fetchValue(t *testing.T) {
75111
colIndex: 0,
76112
tupIndex: 0,
77113
name: "id",
78-
vtype: typeNumber,
114+
vType: typeNumber,
115+
cType: castNone,
79116
unsigned: true,
80117
},
81118
args: args{
@@ -90,7 +127,8 @@ func Test_attribute_fetchValue(t *testing.T) {
90127
colIndex: 1,
91128
tupIndex: 1,
92129
name: "name",
93-
vtype: typeString,
130+
vType: typeString,
131+
cType: castNone,
94132
unsigned: false,
95133
},
96134
args: args{
@@ -105,7 +143,8 @@ func Test_attribute_fetchValue(t *testing.T) {
105143
colIndex: 5,
106144
tupIndex: 5,
107145
name: "name",
108-
vtype: typeString,
146+
vType: typeString,
147+
cType: castNone,
109148
unsigned: false,
110149
},
111150
args: args{
@@ -123,7 +162,8 @@ func Test_attribute_fetchValue(t *testing.T) {
123162
colIndex: tt.fields.colIndex,
124163
tupIndex: tt.fields.tupIndex,
125164
name: tt.fields.name,
126-
vtype: tt.fields.vtype,
165+
vType: tt.fields.vType,
166+
cType: tt.fields.cType,
127167
unsigned: tt.fields.unsigned,
128168
}
129169
got, err := a.fetchValue(tt.args.row)

internal/bridge/replicator.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func (b *Bridge) newRules(cfg *config.Config) error {
9797
rules := make(map[string]*rule, len(cfg.Replication.Mappings))
9898
for _, mapping := range cfg.Replication.Mappings {
9999
source := mapping.Source
100+
cast := mapping.Dest.Cast
100101

101102
tableInfo, err := b.canal.GetTable(source.Schema, source.Table)
102103
if err != nil {
@@ -107,6 +108,10 @@ func (b *Bridge) newRules(cfg *config.Config) error {
107108
if len(pks) == 0 {
108109
return fmt.Errorf("no primary keys found, schema: %s, table: %s", source.Schema, source.Table)
109110
}
111+
for _, pk := range pks {
112+
typ := castTypeFromString(cast[pk.name])
113+
pk.castTo(typ)
114+
}
110115

111116
attrs := make([]*attribute, 0, len(source.Columns))
112117
for i, name := range source.Columns {
@@ -125,6 +130,9 @@ func (b *Bridge) newRules(cfg *config.Config) error {
125130
if err != nil {
126131
return err
127132
}
133+
typ := castTypeFromString(cast[name])
134+
attr.castTo(typ)
135+
128136
attrs = append(attrs, attr)
129137
}
130138
}

internal/bridge/replicator_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path"
88
"path/filepath"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -291,6 +292,57 @@ func (s *bridgeSuite) TestUpdatePrimaryKeys() {
291292
}
292293
}
293294

295+
func (s *bridgeSuite) TestForceCast() {
296+
t := s.T()
297+
298+
for step := 1; step <= 2; step++ {
299+
wantErr := step == 2
300+
301+
cfg := *s.cfg
302+
if wantErr {
303+
for i := range cfg.Replication.Mappings {
304+
cfg.Replication.Mappings[i].Dest.Cast = nil
305+
}
306+
}
307+
308+
s.init(&cfg)
309+
310+
var wg sync.WaitGroup
311+
wg.Add(1)
312+
313+
go func() {
314+
defer wg.Done()
315+
316+
errors := s.bridge.Run()
317+
if wantErr {
318+
assert.Len(t, errors, 1)
319+
} else {
320+
for err := range errors {
321+
assert.NoError(t, err)
322+
}
323+
}
324+
}()
325+
326+
name := fmt.Sprintf("alice_%d", step)
327+
_, err := s.executeSQL("INSERT INTO city.logins (username, ip, date, attempts, longitude, latitude) VALUES (?, ?, ?, ?, ?, ?)", name, "192.168.1.1", 1604571708, 404, 73.98, 40.74)
328+
require.NoError(t, err)
329+
330+
if wantErr {
331+
wg.Wait()
332+
} else {
333+
err = s.bridge.canal.CatchMasterPos(500 * time.Millisecond)
334+
require.NoError(t, err)
335+
336+
require.Eventually(t, func() bool {
337+
return s.hasSyncedData("logins", 1)
338+
}, 500*time.Millisecond, 50*time.Millisecond)
339+
}
340+
341+
err = s.bridge.Close()
342+
assert.NoError(t, err)
343+
}
344+
}
345+
294346
func (s *bridgeSuite) TestReconnect() {
295347
t := s.T()
296348

0 commit comments

Comments
 (0)