Skip to content

CBG-4267 implement LWW conflict resolution for blip tester #7657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (v Version) IsEmpty() bool {
return v.SourceID == "" && v.Value == 0
}

// Equal returns true if sourceID and value of the two versions are equal.
func (v Version) Equal(other Version) bool {
return v.SourceID == other.SourceID && v.Value == other.Value
}

// StringForVersionDelta will take a version struct and convert the value to delta format
// (encoding it to LE hex, stripping any 0's off the end and stripping leading 0x)
func (v Version) StringForVersionDelta() string {
Expand Down Expand Up @@ -179,7 +184,8 @@ func NewHybridLogicalVector() *HybridLogicalVector {
}
}

func (hlv *HybridLogicalVector) Equals(other *HybridLogicalVector) bool {
// Equal compares the full HLV to another HLV.
func (hlv *HybridLogicalVector) Equal(other *HybridLogicalVector) bool {
if hlv.SourceID != other.SourceID {
return false
}
Expand Down Expand Up @@ -271,7 +277,7 @@ func (hlv *HybridLogicalVector) InvalidateMV() {
if source == hlv.SourceID {
continue
}
hlv.setPreviousVersion(source, value)
hlv.SetPreviousVersion(source, value)
}
hlv.MergeVersions = nil
}
Expand Down Expand Up @@ -336,21 +342,21 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto

// Copy incoming merge versions (previously existing merge versions will have been moved to pv by AddVersion)
for i, v := range otherVector.MergeVersions {
hlv.setMergeVersion(i, v)
hlv.SetMergeVersion(i, v)
}

if len(otherVector.PreviousVersions) != 0 {
// Iterate through incoming vector previous versions, update with the version from other vector
// for source if the local version for that source is lower
for i, v := range otherVector.PreviousVersions {
if hlv.PreviousVersions[i] == 0 {
hlv.setPreviousVersion(i, v)
hlv.SetPreviousVersion(i, v)
} else {
// if we get here then there is entry for this source in PV so we must check if its newer or not
otherHLVPVValue := v
localHLVPVValue := hlv.PreviousVersions[i]
if localHLVPVValue < otherHLVPVValue {
hlv.setPreviousVersion(i, v)
hlv.SetPreviousVersion(i, v)
}
}
}
Expand Down Expand Up @@ -382,16 +388,16 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi
return outputSpec
}

// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) {
// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) {
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(HLVVersions)
}
hlv.PreviousVersions[source] = version
}

// setMergeVersion will take a source/version pair and add it to the HLV merge versions map
func (hlv *HybridLogicalVector) setMergeVersion(source string, version uint64) {
// SetMergeVersion will take a source/version pair and add it to the HLV merge versions map
func (hlv *HybridLogicalVector) SetMergeVersion(source string, version uint64) {
if hlv.MergeVersions == nil {
hlv.MergeVersions = make(HLVVersions)
}
Expand Down
2 changes: 1 addition & 1 deletion db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ func TestAddVersion(t *testing.T) {

expectedHLV, _, err := ExtractHLVFromBlipMessage(tc.expectedHLV)
require.NoError(t, err)
require.True(t, hlv.Equals(expectedHLV), "expected %#v does not match actual %#v", expectedHLV, hlv)
require.True(t, hlv.Equal(expectedHLV), "expected %#v does not match actual %#v", expectedHLV, hlv)

})
}
Expand Down
2 changes: 1 addition & 1 deletion db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (c *DatabaseCollectionWithUser) UpsertTestDocWithVersion(ctx context.Contex
versions, _, err := parseVectorValues(mergeVersionsStr)
require.NoError(t, err, "malformed mergeVersionsStr")
for _, version := range versions {
newDocHLV.setMergeVersion(version.SourceID, version.Value)
newDocHLV.SetMergeVersion(version.SourceID, version.Value)
}
}

Expand Down
57 changes: 57 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3566,3 +3566,60 @@ func TestBlipPushRevOnResurrection(t *testing.T) {
})
}
}

func TestBlipPullConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelInfo, base.KeySync, base.KeySyncMsg, base.KeySGTest)
btcRunner := NewBlipTesterClientRunner(t)

btcRunner.SkipSubtest[RevtreeSubtestName] = true

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTesterPersistentConfig(t)
defer rt.Close()

const (
username = "alice"
cblBody = `{"actor": "cbl"}`
sgBody = `{"actor": "sg"}`
)
rt.CreateUser(username, []string{"*"})
docID := "doc1"
sgVersion := rt.PutDocDirectly(docID, db.Body{"actor": "sg"})

opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols, Username: "alice"}
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts)
defer btc.Close()

client := btcRunner.SingleCollection(btc.id)
preConflictCBLVersion := btcRunner.AddRev(btc.id, docID, EmptyDocVersion(), []byte(cblBody))
require.NotEqual(t, sgVersion, preConflictCBLVersion)

btcRunner.StartOneshotPull(btc.id)

require.EventuallyWithT(t, func(c *assert.CollectT) {
_, _, postConflictCBLVersion := client.GetDoc(docID)
assert.NotEqual(t, sgVersion, postConflictCBLVersion)
}, time.Second*10, time.Millisecond*10, "Expected sgVersion and cblVersion to be different")

postConflictDoc, postConflictHLV, postConflictVersion := client.GetDoc(docID)
require.Equal(t, cblBody, string(postConflictDoc))
// after resolving the conflict, the CBL version should remain the same but the ver of the CV is
// updated to be newer than the pre-conflict CBL version
require.Equal(t, preConflictCBLVersion.CV.SourceID, postConflictVersion.CV.SourceID)
require.Greater(t, postConflictVersion.CV.Value, preConflictCBLVersion.CV.Value)
require.Empty(t, postConflictHLV.PreviousVersions)
require.Equal(t, db.HLVVersions{
sgVersion.CV.SourceID: sgVersion.CV.Value,
preConflictCBLVersion.CV.SourceID: preConflictCBLVersion.CV.Value,
}, postConflictHLV.MergeVersions)

btcRunner.StartPush(btc.id)
rt.WaitForVersion(docID, *postConflictVersion)

collection, ctx := rt.GetSingleTestDatabaseCollection()
bucketDoc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalAll)
require.NoError(t, err)
require.True(t, bucketDoc.HLV.Equal(postConflictHLV), "Expected bucket doc HLV to match post-conflict HLV, got %v, expected %v", bucketDoc.HLV, postConflictHLV)
})

}
Loading
Loading