@@ -4,19 +4,15 @@ namespace PowerSync.Common.Client.Sync.Bucket;
4
4
using System . Collections . Generic ;
5
5
using System . Linq ;
6
6
using System . Threading . Tasks ;
7
-
8
7
using Microsoft . Extensions . Logging ;
9
8
using Microsoft . Extensions . Logging . Abstractions ;
10
-
11
9
using Newtonsoft . Json ;
12
-
13
10
using PowerSync . Common . DB ;
14
11
using PowerSync . Common . DB . Crud ;
15
12
using PowerSync . Common . Utils ;
16
13
17
14
public class SqliteBucketStorage : EventStream < BucketStorageEvent > , IBucketStorageAdapter
18
15
{
19
-
20
16
public static readonly string MAX_OP_ID = "9223372036854775807" ;
21
17
22
18
private readonly IDBAdapter db ;
@@ -37,7 +33,8 @@ private record ExistingTableRowsResult(string name);
37
33
public SqliteBucketStorage ( IDBAdapter db , ILogger ? logger = null )
38
34
{
39
35
this . db = db ;
40
- this . logger = logger ?? NullLogger . Instance ; ;
36
+ this . logger = logger ?? NullLogger . Instance ;
37
+ ;
41
38
hasCompletedSync = false ;
42
39
pendingBucketDeletes = true ;
43
40
tableNames = [ ] ;
@@ -62,9 +59,10 @@ public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)
62
59
63
60
public async Task Init ( )
64
61
{
65
-
66
62
hasCompletedSync = false ;
67
- var existingTableRows = await db . GetAll < ExistingTableRowsResult > ( "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'" ) ;
63
+ var existingTableRows =
64
+ await db . GetAll < ExistingTableRowsResult > (
65
+ "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'" ) ;
68
66
69
67
foreach ( var row in existingTableRows )
70
68
{
@@ -79,6 +77,7 @@ public async Task Init()
79
77
}
80
78
81
79
private record ClientIdResult ( string ? client_id ) ;
80
+
82
81
public async Task < string > GetClientId ( )
83
82
{
84
83
if ( clientId == null )
@@ -95,12 +94,15 @@ public string GetMaxOpId()
95
94
return MAX_OP_ID ;
96
95
}
97
96
98
- public void StartSession ( ) { }
97
+ public void StartSession ( )
98
+ {
99
+ }
99
100
100
101
public async Task < BucketState [ ] > GetBucketStates ( )
101
102
{
102
103
return
103
- await db . GetAll < BucketState > ( "SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'" ) ;
104
+ await db . GetAll < BucketState > (
105
+ "SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'" ) ;
104
106
}
105
107
106
108
public async Task SaveSyncData ( SyncDataBatch batch )
@@ -115,6 +117,7 @@ await db.WriteTransaction(async tx =>
115
117
logger . LogDebug ( "saveSyncData {message}" , JsonConvert . SerializeObject ( result ) ) ;
116
118
count += b . Data . Length ;
117
119
}
120
+
118
121
compactCounter += count ;
119
122
} ) ;
120
123
}
@@ -140,6 +143,7 @@ await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
140
143
}
141
144
142
145
private record LastSyncedResult ( string ? synced_at ) ;
146
+
143
147
public async Task < bool > HasCompletedSync ( )
144
148
{
145
149
if ( hasCompletedSync ) return true ;
@@ -155,11 +159,13 @@ public async Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoi
155
159
var validation = await ValidateChecksums ( checkpoint ) ;
156
160
if ( ! validation . CheckpointValid )
157
161
{
158
- logger . LogError ( "Checksums failed for {failures}" , JsonConvert . SerializeObject ( validation . CheckpointFailures ) ) ;
162
+ logger . LogError ( "Checksums failed for {failures}" ,
163
+ JsonConvert . SerializeObject ( validation . CheckpointFailures ) ) ;
159
164
foreach ( var failedBucket in validation . CheckpointFailures ?? [ ] )
160
165
{
161
166
await DeleteBucket ( failedBucket ) ;
162
167
}
168
+
163
169
return new SyncLocalDatabaseResult
164
170
{
165
171
Ready = false ,
@@ -210,7 +216,7 @@ private async Task<bool> UpdateObjectsFromBuckets(Checkpoint checkpoint)
210
216
return await db . WriteTransaction ( async tx =>
211
217
{
212
218
var result = await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
213
- [ "sync_local" , "" ] ) ;
219
+ [ "sync_local" , "" ] ) ;
214
220
215
221
return result . InsertId == 1 ;
216
222
} ) ;
@@ -220,18 +226,16 @@ private record ResultResult(object result);
220
226
221
227
public class ResultDetail
222
228
{
223
- [ JsonProperty ( "valid" ) ]
224
- public bool Valid { get ; set ; }
229
+ [ JsonProperty ( "valid" ) ] public bool Valid { get ; set ; }
225
230
226
- [ JsonProperty ( "failed_buckets" ) ]
227
- public List < string > ? FailedBuckets { get ; set ; }
231
+ [ JsonProperty ( "failed_buckets" ) ] public List < string > ? FailedBuckets { get ; set ; }
228
232
}
229
233
230
234
public async Task < SyncLocalDatabaseResult > ValidateChecksums (
231
235
Checkpoint checkpoint )
232
236
{
233
237
var result = await db . Get < ResultResult > ( "SELECT powersync_validate_checkpoint(?) as result" ,
234
- [ JsonConvert . SerializeObject ( checkpoint ) ] ) ;
238
+ [ JsonConvert . SerializeObject ( checkpoint ) ] ) ;
235
239
236
240
logger . LogDebug ( "validateChecksums result item {message}" , JsonConvert . SerializeObject ( result ) ) ;
237
241
@@ -298,6 +302,7 @@ await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
298
302
}
299
303
300
304
private record TargetOpResult ( string target_op ) ;
305
+
301
306
private record SequenceResult ( int seq ) ;
302
307
303
308
public async Task < bool > UpdateLocalTarget ( Func < Task < string > > callback )
@@ -351,16 +356,18 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
351
356
352
357
if ( seqAfter != seqBefore )
353
358
{
354
- logger . LogDebug ( "[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})" , seqAfter , seqBefore ) ;
359
+ logger . LogDebug ( "[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})" , seqAfter ,
360
+ seqBefore ) ;
355
361
return false ;
356
362
}
357
363
358
364
var response = await tx . Execute (
359
- "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
360
- [ opId ]
361
- ) ;
365
+ "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
366
+ [ opId ]
367
+ ) ;
362
368
363
- logger . LogDebug ( "[updateLocalTarget] Response from updating target_op: {response}" , JsonConvert . SerializeObject ( response ) ) ;
369
+ logger . LogDebug ( "[updateLocalTarget] Response from updating target_op: {response}" ,
370
+ JsonConvert . SerializeObject ( response ) ) ;
364
371
return true ;
365
372
} ) ;
366
373
}
@@ -388,33 +395,33 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
388
395
var last = all [ all . Length - 1 ] ;
389
396
390
397
return new CrudBatch (
391
- Crud : all ,
392
- HaveMore : true ,
393
- CompleteCallback : async ( string ? writeCheckpoint ) =>
394
- {
395
- await db . WriteTransaction ( async tx =>
398
+ Crud : all ,
399
+ HaveMore : true ,
400
+ CompleteCallback : async ( string ? writeCheckpoint ) =>
396
401
{
397
- await tx . Execute ( "DELETE FROM ps_crud WHERE id <= ?" , [ last . ClientId ] ) ;
398
-
399
- if ( ! string . IsNullOrEmpty ( writeCheckpoint ) )
402
+ await db . WriteTransaction ( async tx =>
400
403
{
401
- var crudResult = await tx . GetAll < object > ( "SELECT 1 FROM ps_crud LIMIT 1" ) ;
402
- if ( crudResult ? . Length > 0 )
404
+ await tx . Execute ( "DELETE FROM ps_crud WHERE id <= ?" , [ last . ClientId ] ) ;
405
+
406
+ if ( ! string . IsNullOrEmpty ( writeCheckpoint ) )
407
+ {
408
+ var crudResult = await tx . GetAll < object > ( "SELECT 1 FROM ps_crud LIMIT 1" ) ;
409
+ if ( crudResult ? . Length > 0 )
410
+ {
411
+ await tx . Execute (
412
+ "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
413
+ [ writeCheckpoint ] ) ;
414
+ }
415
+ }
416
+ else
403
417
{
404
418
await tx . Execute (
405
419
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
406
- [ writeCheckpoint ] ) ;
420
+ [ GetMaxOpId ( ) ] ) ;
407
421
}
408
- }
409
- else
410
- {
411
- await tx . Execute (
412
- "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
413
- [ GetMaxOpId ( ) ] ) ;
414
- }
415
- } ) ;
416
- }
417
- ) ;
422
+ } ) ;
423
+ }
424
+ ) ;
418
425
}
419
426
420
427
public async Task < CrudEntry ? > NextCrudItem ( )
@@ -436,13 +443,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint)
436
443
}
437
444
438
445
record ControlResult ( string ? r ) ;
446
+
439
447
public async Task < string > Control ( string op , object ? payload )
440
448
{
441
449
return await db . WriteTransaction ( async tx =>
442
450
{
443
- var result = await tx . Get < ControlResult > ( "SELECT powersync_control(?, ?) AS r" , [ op , payload ] ) ;
444
- Console . WriteLine ( "Control Response: " + result . r ) ;
445
- return result . r ;
446
- } ) ;
451
+ var result = await tx . Get < ControlResult > ( "SELECT powersync_control(?, ?) AS r" , [ op , payload ] ) ;
452
+
453
+
454
+ return result . r ;
455
+ } ) ;
447
456
}
448
- }
457
+ }
0 commit comments