@@ -16,6 +16,7 @@ import (
16
16
"encoding/base64"
17
17
"fmt"
18
18
"iter"
19
+ "maps"
19
20
"net/http"
20
21
"slices"
21
22
"strconv"
@@ -299,36 +300,93 @@ func (cd *clientDoc) _proposeChangesEntryForDoc() *proposeChangeBatchEntry {
299
300
return & proposeChangeBatchEntry {docID : cd .id , version : latestRev .version , revTreeIDHistory : revTreeIDHistory , hlvHistory : latestRev .HLV , latestServerVersion : cd ._latestServerVersion , seq : cd ._latestSeq , isDelete : latestRev .isDelete }
300
301
}
301
302
302
- type conflictResolutionWinner uint
303
-
304
- const (
305
- remoteDocWinner conflictResolutionWinner = iota // The remote document is the winner in the conflict resolution
306
- localDocWinner // The local document is the winner in the conflict resolution
307
- )
308
-
309
- func (r * clientDocRev ) _pickConflictWinner (t testing.TB , incomingCV db.Version , conflictResolver BlipTesterClientConflictResolverType ) conflictResolutionWinner {
310
- localCV := r .HLV .ExtractCurrentVersionFromHLV ()
311
- // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent
312
- if localCV .Equal (incomingCV ) {
313
- require .FailNow (t , fmt .Sprintf ("incoming CV %#+v is equal to local CV revision %#+v - this should've been filtered via changes response before ending up as a rev" , incomingCV , r ))
303
+ // _getLatestHLVCopy returns a copy of the HLV. If there is no document, return an empty HLV.
304
+ func (cd * clientDoc ) _getLatestHLVCopy (t testing.TB ) db.HybridLogicalVector {
305
+ // Get the latest HLV for the document, if it exists
306
+ if cd == nil {
307
+ return * db .NewHybridLogicalVector ()
314
308
}
315
- if incomingCV .SourceID == localCV .SourceID {
316
- // incomingVersion has the same sourceID as the local version.
309
+ latestRev := cd ._latestRev (t )
310
+ return * latestRev .HLV .Copy ()
311
+ }
317
312
318
- // Potentially we should add a check to make sure that the HLV is not rolling back
319
- /// incomingCV.Value > clientCV.Value
320
- return remoteDocWinner
313
+ func (cd * clientDoc ) _hasConflict (t testing.TB , incomingHLV * db.HybridLogicalVector ) bool {
314
+ // there is no local document
315
+ if cd == nil {
316
+ return false
317
+ }
318
+ latestRev := cd ._latestRev (t )
319
+ if latestRev .version .RevTreeID != "" {
320
+ // currently no conflict detection or resolution for revtree clients.
321
+ return false
321
322
}
322
323
323
- switch conflictResolver {
324
+ localHLV := latestRev .HLV
325
+ incomingCV := incomingHLV .ExtractCurrentVersionFromHLV ()
326
+ localCV := localHLV .ExtractCurrentVersionFromHLV ()
327
+ // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent
328
+ if localCV .Equal (* incomingCV ) {
329
+ require .FailNow (t , fmt .Sprintf ("incoming CV %#+v is equal to local revision %#+v - this should've been filtered via changes response before ending up as a rev. This is only true if there is a single replication occuring, two simultaneous replications (e.g. P2P) could cause this. If there are multiple replications, modify code." , incomingCV , latestRev ))
330
+ }
331
+ // standard no conflict case. In the simple case, this happens when:
332
+ // - SG writes document 1@cbs1
333
+ // - CBL pulls document 1@cbs1
334
+ // - SG writes document 2@cbs1
335
+ if incomingHLV .DominatesSource (* localCV ) {
336
+ return false
337
+ }
338
+
339
+ // local revision is newer than incoming revision. Common case:
340
+ // - CBL writes document 1@cbl1
341
+ // - CBL pushes to SG as 1@cbl1
342
+ // - CBL pulls document 1@cbl1
343
+ //
344
+ // NOTE: without P2P replication, this should not be the case and we would not get this revision, since CBL
345
+ // would respond to a SG changes message that CBL does not need this revision
346
+ if localHLV .DominatesSource (* incomingCV ) {
347
+ require .FailNow (t , fmt .Sprintf ("incoming CV %#+v has lower version than the local revision %#+v - this should've been filtered via changes response before ending up as a rev. blip tester would reply that to Sync Gateway that it doesn't need this revision" , incomingCV , localHLV ))
348
+ return false
349
+ }
350
+ // Check if conflict has been previously resolved.
351
+ // - If merge versions are empty, then it has not be resolved.
352
+ // - If merge versions do not match, then it has not been resolved.
353
+ if len (incomingHLV .MergeVersions ) != 0 && len (localHLV .MergeVersions ) != 0 && maps .Equal (incomingHLV .MergeVersions , localHLV .MergeVersions ) {
354
+ return false
355
+ }
356
+ return true
357
+ }
358
+
359
+ func (btcc * BlipTesterCollectionClient ) _resolveConflict (incomingHLV * db.HybridLogicalVector , incomingBody []byte , localDoc * clientDocRev ) (body []byte , hlv db.HybridLogicalVector ) {
360
+ switch btcc .parent .ConflictResolver {
324
361
case ConflictResolverLastWriteWins :
325
- if incomingCV .Value > localCV .Value {
326
- return remoteDocWinner
327
- }
328
- return localDocWinner
329
- }
330
- t .Fatalf ("Unknown conflict resolver %q - cannot resolve detected conflict" , conflictResolver )
331
- return localDocWinner // unreachable, but required to satisfy return type
362
+ return btcc ._resolveConflictLWW (incomingHLV , incomingBody , localDoc )
363
+ }
364
+ btcc .TB ().Fatalf ("Unknown conflict resolver %q - cannot resolve detected conflict" , btcc .parent .ConflictResolver )
365
+ return nil , db.HybridLogicalVector {}
366
+ }
367
+
368
+ func (btcc * BlipTesterCollectionClient ) _resolveConflictLWW (incomingHLV * db.HybridLogicalVector , incomingBody []byte , latestLocalRev * clientDocRev ) (body []byte , hlv db.HybridLogicalVector ) {
369
+ latestLocalHLV := latestLocalRev .HLV
370
+ updatedHLV := latestLocalRev .HLV .Copy ()
371
+ // resolve conflict in favor of remote document
372
+ if incomingHLV .Version > latestLocalHLV .Version {
373
+ require .NoError (btcc .TB (), updatedHLV .AddNewerVersions (incomingHLV ))
374
+ return incomingBody , * updatedHLV
375
+ }
376
+ // move all versions from remote HLV to local HLV, this might not be correct since it will invalidate the new mv. AddNewerVersions will update the CV as well.
377
+ require .NoError (btcc .TB (), updatedHLV .AddNewerVersions (incomingHLV ))
378
+ // manually reconstruct the HLV
379
+ // - remove the any pv that contain the merge sourceIDs
380
+ // - add the merge sourceIDs with the incoming version and local version
381
+ // - update the new CV
382
+ delete (updatedHLV .PreviousVersions , incomingHLV .SourceID )
383
+ delete (updatedHLV .PreviousVersions , btcc .parent .SourceID )
384
+ delete (updatedHLV .PreviousVersions , latestLocalHLV .SourceID )
385
+ updatedHLV .SetMergeVersion (incomingHLV .SourceID , incomingHLV .Version )
386
+ updatedHLV .SetMergeVersion (latestLocalHLV .SourceID , latestLocalHLV .Version )
387
+ updatedHLV .SourceID = btcc .parent .SourceID
388
+ updatedHLV .Version = uint64 (btcc .hlc .Now ())
389
+ return latestLocalRev .body , * updatedHLV
332
390
}
333
391
334
392
type BlipTesterCollectionClient struct {
@@ -615,30 +673,12 @@ func (btr *BlipTesterReplicator) handleRev(ctx context.Context, btc *BlipTesterC
615
673
revID := msg .Properties [db .RevMessageRev ]
616
674
deltaSrc := msg .Properties [db .RevMessageDeltaSrc ]
617
675
replacedRev := msg .Properties [db .RevMessageReplacedRev ]
618
- revHistory := msg .Properties [db .RevMessageHistory ]
619
676
620
677
body , err := msg .Body ()
621
678
require .NoError (btr .TB (), err )
622
679
623
680
if msg .Properties [db .RevMessageDeleted ] == "1" {
624
- var incomingVersion DocVersion
625
- var incomingHLV * db.HybridLogicalVector
626
- if btc .UseHLV () {
627
- cv , err := db .ParseVersion (revID )
628
- require .NoError (btr .TB (), err , "error parsing version %q: %v" , revID , err )
629
- incomingVersion = DocVersion {CV : cv }
630
- if revHistory != "" {
631
- // why is rev history not populated for a deleted message?
632
- incomingHLV , _ , err = db .ExtractHLVFromBlipMessage (revHistory )
633
- require .NoError (btr .TB (), err , "error extracting HLV %q: %v" , revHistory , err )
634
- } else {
635
- incomingHLV = db .NewHybridLogicalVector ()
636
- }
637
- require .NoError (btr .TB (), incomingHLV .AddVersion (cv ))
638
- require .Equal (btr .TB (), cv , * incomingHLV .ExtractCurrentVersionFromHLV (), "incoming version CV %#+v from revID should be part of incomingHLV: %#v. Full properties, %#+v" , cv , incomingHLV , msg .Properties )
639
- } else {
640
- incomingVersion = DocVersion {RevTreeID : revID }
641
- }
681
+ incomingHLV , incomingVersion := btc .getVersionsFromRevMessage (msg )
642
682
rev := revOptions {
643
683
incomingVersion : incomingVersion ,
644
684
body : body ,
@@ -806,25 +846,7 @@ func (btr *BlipTesterReplicator) handleRev(ctx context.Context, btc *BlipTesterC
806
846
body , err = base .JSONMarshal (bodyJSON )
807
847
require .NoError (btr .TB (), err )
808
848
}
809
- var incomingVersion DocVersion
810
- var incomingHLV * db.HybridLogicalVector
811
- if btc .UseHLV () {
812
- cv , err := db .ParseVersion (revID )
813
- require .NoError (btr .TB (), err , "error parsing version %q: %v" , revID , err )
814
- incomingVersion = DocVersion {CV : cv }
815
- if revHistory != "" {
816
- incomingHLV , _ , err = db .ExtractHLVFromBlipMessage (revHistory )
817
- require .NoError (btr .TB (), err , "error extracting HLV %q: %v" , revHistory , err )
818
- } else {
819
- // If no revHistory is provided, we need to create a new HLV with the incoming version
820
- incomingHLV = db .NewHybridLogicalVector ()
821
- }
822
- require .NoError (btr .TB (), incomingHLV .AddVersion (cv ))
823
- require .Equal (btr .TB (), cv , * incomingHLV .ExtractCurrentVersionFromHLV (), "incoming version CV %#+v from revID should be part of incomingHLV: %#v. Full properties, %#+v" , cv , incomingHLV , msg .Properties )
824
- } else {
825
- incomingVersion = DocVersion {RevTreeID : revID }
826
- }
827
-
849
+ incomingHLV , incomingVersion := btc .getVersionsFromRevMessage (msg )
828
850
rev := revOptions {
829
851
incomingVersion : incomingVersion ,
830
852
body : body ,
@@ -2015,59 +2037,29 @@ func (btcc *BlipTesterCollectionClient) addRev(ctx context.Context, docID string
2015
2037
defer btcc .seqLock .Unlock ()
2016
2038
newClientSeq := btcc ._nextSequence ()
2017
2039
2018
- winner := remoteDocWinner
2019
- var localRev * clientDocRev
2040
+ newBody := opts . body
2041
+ newVersion := opts . incomingVersion
2020
2042
doc , hasLocalDoc := btcc ._getClientDoc (docID )
2021
- // No conflict resolution for revtree at this time
2022
- var updatedHLV * db.HybridLogicalVector
2023
- if hasLocalDoc {
2024
- localRev = doc ._latestRev (btcc .TB ())
2025
- winner = localRev ._pickConflictWinner (btcc .TB (), opts .incomingVersion .CV , btcc .parent .ConflictResolver )
2026
- // Start HLV with the local version, if it exists
2027
- updatedHLV = localRev .HLV .Copy ()
2043
+ updatedHLV := doc ._getLatestHLVCopy (btcc .TB ())
2044
+ if doc ._hasConflict (btcc .TB (), opts .incomingHLV ) {
2045
+ newBody , updatedHLV = btcc ._resolveConflict (opts .incomingHLV , opts .body , doc ._latestRev (btcc .TB ()))
2046
+ base .DebugfCtx (ctx , base .KeySGTest , "Resolved conflict for docID %q, incomingHLV:%v, existingHLV:%v, updatedHLV:%v" , docID , opts .incomingHLV , doc ._latestRev (btcc .TB ()).HLV , updatedHLV )
2028
2047
} else {
2029
- updatedHLV = db .NewHybridLogicalVector ()
2048
+ base .DebugfCtx (ctx , base .KeySGTest , "No conflict" )
2049
+ if btcc .UseHLV () {
2050
+ // Add the incoming HLV to the local HLV, regardless of winner
2051
+ require .NoError (btcc .TB (), updatedHLV .AddNewerVersions (opts .incomingHLV ))
2052
+ }
2030
2053
}
2054
+ newVersion .CV = * updatedHLV .ExtractCurrentVersionFromHLV ()
2031
2055
// ConflictResolver is currently on BlipTesterClient, but might be per replication in the future.
2032
2056
docRev := clientDocRev {
2033
2057
clientSeq : newClientSeq ,
2034
2058
isDelete : opts .isDelete ,
2035
2059
message : opts .msg ,
2036
- HLV : * updatedHLV ,
2037
- }
2038
- switch winner {
2039
- case remoteDocWinner :
2040
- docRev .body = opts .body
2041
- docRev .version = opts .incomingVersion
2042
- if btcc .UseHLV () {
2043
- // Add the incoming HLV to the local HLV, regardless of winner
2044
- require .NoError (btcc .TB (), docRev .HLV .AddNewerVersions (opts .incomingHLV ))
2045
- }
2046
- case localDocWinner :
2047
- docRev .body = doc ._latestRev (btcc .TB ()).body
2048
- docRev .version = DocVersion {
2049
- RevTreeID : opts .incomingVersion .RevTreeID ,
2050
- }
2051
-
2052
- if btcc .UseHLV () {
2053
- // move all versions from remote HLV to local HLV
2054
- require .NoError (btcc .TB (), docRev .HLV .AddNewerVersions (opts .incomingHLV ))
2055
- // manually reconstruct the HLV
2056
- // - remove the any pv that contain the merge sourceIDs
2057
- // - add the merge sourceIDs with the incoming version and local version
2058
- // - update the new CV
2059
- delete (docRev .HLV .PreviousVersions , opts .incomingHLV .SourceID )
2060
- delete (docRev .HLV .PreviousVersions , btcc .parent .SourceID )
2061
- docRev .HLV .SetMergeVersion (opts .incomingHLV .SourceID , opts .incomingHLV .Version )
2062
- docRev .HLV .SetMergeVersion (localRev .HLV .SourceID , localRev .HLV .Version )
2063
- newCV := db.Version {SourceID : btcc .parent .SourceID , Value : uint64 (btcc .hlc .Now ())}
2064
- // can not call AddVersion, since that will invalidate mv and move to pv. Set SourceID and Version directly on HybridLogicalVector
2065
- docRev .HLV .SourceID = newCV .SourceID
2066
- docRev .HLV .Version = newCV .Value
2067
- docRev .version .CV = newCV
2068
- }
2069
- default :
2070
- require .FailNow (btcc .TB (), fmt .Sprintf ("Unknown type of document winner %#v" , winner ))
2060
+ body : newBody ,
2061
+ HLV : updatedHLV ,
2062
+ version : newVersion ,
2071
2063
}
2072
2064
2073
2065
if ! hasLocalDoc {
@@ -2088,8 +2080,7 @@ func (btcc *BlipTesterCollectionClient) addRev(ctx context.Context, docID string
2088
2080
doc ._latestServerVersion = opts .incomingVersion
2089
2081
}
2090
2082
// if we resolved a conflict, then we might need to push this conflict back
2091
- if winner == localDocWinner {
2092
- base .DebugfCtx (ctx , base .KeySGTest , "Using LWW to resolve a conflicted revision and picking CBL version. Incoming HLV: %#+v, Local HLV %#+v, Result HLV %#+v" , opts .incomingHLV , localRev .HLV , docRev .HLV )
2083
+ if newVersion != opts .incomingVersion {
2093
2084
btcc ._seqCond .Broadcast ()
2094
2085
}
2095
2086
}
@@ -2116,3 +2107,22 @@ func (btc *BlipTesterClient) AssertDeltaSrcProperty(t *testing.T, msg *blip.Mess
2116
2107
rev := docVersion .GetRev (subProtocol >= db .CBMobileReplicationV4 )
2117
2108
assert .Equal (t , rev , msg .Properties [db .RevMessageDeltaSrc ])
2118
2109
}
2110
+
2111
+ // getHLVFromRevMessage extracts the full HLV from a rev message. This will fail the test if the message does not contain a valid HLV.
2112
+ func (btc * BlipTesterClient ) getVersionsFromRevMessage (msg * blip.Message ) (* db.HybridLogicalVector , DocVersion ) {
2113
+ revID := msg .Properties [db .RevMessageRev ]
2114
+ require .NotEmpty (btc .TB (), revID , "revID is empty in message %#+v" , msg .Properties )
2115
+ if ! btc .UseHLV () {
2116
+ return nil , DocVersion {RevTreeID : revID }
2117
+ }
2118
+ revHistory := msg .Properties [db .RevMessageHistory ]
2119
+ hlvStr := revID
2120
+ if revHistory != "" {
2121
+ hlvStr += "," + revHistory
2122
+ }
2123
+ hlv , _ , err := db .ExtractHLVFromBlipMessage (hlvStr )
2124
+ require .NoError (btc .TB (), err )
2125
+ require .NotEmpty (btc .TB (), hlv .SourceID , "HLV SourceID is empty from message %#+v, hlv=%q" , msg .Properties , hlvStr )
2126
+ require .NotEmpty (btc .TB (), hlv .Version , "HLV Version is empty from message %#+v, hlv=%q" , msg .Properties , hlvStr )
2127
+ return hlv , DocVersion {CV : * hlv .ExtractCurrentVersionFromHLV ()}
2128
+ }
0 commit comments