Skip to content

CBG-4267 implement LWW conflict resolution for blip tester #7657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"runtime/debug"
Expand Down Expand Up @@ -1104,18 +1105,9 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
history = append(history, strings.Split(historyStr, ",")...)
}
} else {
versionVectorStr := rev
if historyStr != "" {
// this means that there is a mv
if strings.Contains(historyStr, ";") {
versionVectorStr += "," + historyStr
} else {
versionVectorStr += ";" + historyStr
}
}
incomingHLV, legacyRevList, err = ExtractHLVFromBlipMessage(versionVectorStr)
incomingHLV, legacyRevList, err = GetHLVFromRevMessage(rq)
if err != nil {
base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. HLV:%v Error: %v", base.UD(docID), versionVectorStr, err)
base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. Error: %v", base.UD(docID), err)
return base.HTTPErrorf(http.StatusUnprocessableEntity, "error extracting hlv from blip message")
}
newDoc.HLV = incomingHLV
Expand Down Expand Up @@ -1689,3 +1681,24 @@ func allowedAttachmentKey(docID, digest string, activeCBMobileSubprotocol CBMobi
func (bh *blipHandler) logEndpointEntry(profile, endpoint string) {
base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "#%d: Type:%s %s", bh.serialNumber, profile, endpoint)
}

// GetHLVFromRevMessage extracts the full HLV from a rev message. This will fail the test if the message does not contain a valid HLV.
//
// Function will return list of revIDs if legacy revtree IDs were found in the HLV history section (PV)
func GetHLVFromRevMessage(msg *blip.Message) (*HybridLogicalVector, []string, error) {
revID := msg.Properties[RevMessageRev]
if revID == "" {
return nil, nil, errors.New("RevID property is empty")
}
versionVectorStr := revID
historyStr := msg.Properties[RevMessageHistory]
if historyStr != "" {
// this means that there is a mv
if strings.Contains(historyStr, ";") {
versionVectorStr += "," + historyStr
} else {
versionVectorStr += ";" + historyStr
}
}
return extractHLVFromBlipString(versionVectorStr)
}
6 changes: 4 additions & 2 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
if bsc.useDeltas && len(knownRevsArray) > 0 {
if revID, ok := knownRevsArray[0].(string); ok {
if versionVectorProtocol {
msgHLV, _, err := ExtractHLVFromBlipMessage(revID)
// Is this full history?
msgHLV, _, err := extractHLVFromBlipString(revID)
if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID))
deltaSrcRevID = "" // will force falling back to full body replication below
Expand All @@ -387,7 +388,8 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b

for _, rev := range knownRevsArray {
if revID, ok := rev.(string); ok {
msgHLV, _, err := ExtractHLVFromBlipMessage(revID)
// Is this full history?
msgHLV, _, err := extractHLVFromBlipString(revID)
if err != nil {
// assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication
if versionVectorProtocol {
Expand Down
2 changes: 1 addition & 1 deletion db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3313,7 +3313,7 @@ func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context,
// Temporary (CBG-4466): check the full HLV that's being sent by CBL with proposeChanges messages.
// If the current server cv is dominated by the incoming HLV (i.e. the incoming HLV has an entry for the same source
// with a version that's greater than or equal to the server's cv), then we can accept the proposed version.
proposedHLV, _, err := ExtractHLVFromBlipMessage(proposedHLVString)
proposedHLV, _, err := extractHLVFromBlipString(proposedHLVString)
if err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "CheckProposedVersion for doc %s unable to extract proposedHLV from rev message, will be treated as conflict: %v", base.UD(docid), err)
} else if proposedHLV.DominatesSource(localDocCV) {
Expand Down
39 changes: 29 additions & 10 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (v Version) IsEmpty() bool {
return v.SourceID == "" && v.Value == 0
}

// Equal returns true if sourceID and value of the two versions are equal.
func (v Version) Equal(other Version) bool {
return v.SourceID == other.SourceID && v.Value == other.Value
}

// StringForVersionDelta will take a version struct and convert the value to delta format
// (encoding it to LE hex, stripping any 0's off the end and stripping leading 0x)
func (v Version) StringForVersionDelta() string {
Expand Down Expand Up @@ -183,6 +188,7 @@ func NewHybridLogicalVector() *HybridLogicalVector {
}
}

// Equal compares the full HLV to another HLV.
func (hlv *HybridLogicalVector) Equal(other *HybridLogicalVector) bool {
if hlv.SourceID != other.SourceID {
return false
Expand All @@ -202,6 +208,19 @@ func (hlv *HybridLogicalVector) Equal(other *HybridLogicalVector) bool {
return true
}

func (hlv *HybridLogicalVector) Copy() *HybridLogicalVector {
if hlv == nil {
return nil
}
return &HybridLogicalVector{
CurrentVersionCAS: hlv.CurrentVersionCAS,
SourceID: hlv.SourceID,
Version: hlv.Version,
MergeVersions: maps.Clone(hlv.MergeVersions),
PreviousVersions: maps.Clone(hlv.PreviousVersions),
}
}

// GetCurrentVersion returns the current version from the HLV in memory.
func (hlv *HybridLogicalVector) GetCurrentVersion() (string, uint64) {
return hlv.SourceID, hlv.Version
Expand Down Expand Up @@ -275,7 +294,7 @@ func (hlv *HybridLogicalVector) InvalidateMV() {
if source == hlv.SourceID {
continue
}
hlv.setPreviousVersion(source, value)
hlv.SetPreviousVersion(source, value)
}
hlv.MergeVersions = nil
}
Expand Down Expand Up @@ -340,21 +359,21 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto

// Copy incoming merge versions (previously existing merge versions will have been moved to pv by AddVersion)
for i, v := range otherVector.MergeVersions {
hlv.setMergeVersion(i, v)
hlv.SetMergeVersion(i, v)
}

if len(otherVector.PreviousVersions) != 0 {
// Iterate through incoming vector previous versions, update with the version from other vector
// for source if the local version for that source is lower
for i, v := range otherVector.PreviousVersions {
if hlv.PreviousVersions[i] == 0 {
hlv.setPreviousVersion(i, v)
hlv.SetPreviousVersion(i, v)
} else {
// if we get here then there is entry for this source in PV so we must check if its newer or not
otherHLVPVValue := v
localHLVPVValue := hlv.PreviousVersions[i]
if localHLVPVValue < otherHLVPVValue {
hlv.setPreviousVersion(i, v)
hlv.SetPreviousVersion(i, v)
}
}
}
Expand Down Expand Up @@ -386,16 +405,16 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi
return outputSpec
}

// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) {
// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) {
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(HLVVersions)
}
hlv.PreviousVersions[source] = version
}

// setMergeVersion will take a source/version pair and add it to the HLV merge versions map
func (hlv *HybridLogicalVector) setMergeVersion(source string, version uint64) {
// SetMergeVersion will take a source/version pair and add it to the HLV merge versions map
func (hlv *HybridLogicalVector) SetMergeVersion(source string, version uint64) {
if hlv.MergeVersions == nil {
hlv.MergeVersions = make(HLVVersions)
}
Expand Down Expand Up @@ -456,15 +475,15 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec,

}

// ExtractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip
// extractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip
// blip string may be the following formats
// 1. cv only: cv
// 2. cv and pv: cv;pv
// 3. cv, pv, and mv: cv;mv;pv
//
// Function will return list of revIDs if legacy rev ID was found in the HLV history section (PV)
// TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL
func ExtractHLVFromBlipMessage(versionVectorStr string) (*HybridLogicalVector, []string, error) {
func extractHLVFromBlipString(versionVectorStr string) (*HybridLogicalVector, []string, error) {
hlv := &HybridLogicalVector{}

vectorFields := strings.Split(versionVectorStr, ";")
Expand Down
12 changes: 6 additions & 6 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestInvalidHLVInBlipMessageForm(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
require.NotEmpty(t, testCase.errMsg) // make sure err msg is specified
hlv, legacyRevs, err := ExtractHLVFromBlipMessage(testCase.hlv)
hlv, legacyRevs, err := extractHLVFromBlipString(testCase.hlv)
require.ErrorContains(t, err, testCase.errMsg, "expected err for %s", testCase.hlv)
require.Nil(t, hlv)
require.Nil(t, legacyRevs)
Expand Down Expand Up @@ -854,12 +854,12 @@ func getHLVTestCases(t testing.TB) []extractHLVFromBlipMsgBMarkCases {
}

// TestExtractHLVFromChangesMessage:
// - Each test case gets run through ExtractHLVFromBlipMessage and assert that the resulting HLV
// - Each test case gets run through extractHLVFromBlipString and assert that the resulting HLV
// is correct to what is expected
func TestExtractHLVFromChangesMessage(t *testing.T) {
for _, test := range getHLVTestCases(t) {
t.Run(test.name, func(t *testing.T) {
hlv, legacyRevs, err := ExtractHLVFromBlipMessage(test.hlvString)
hlv, legacyRevs, err := extractHLVFromBlipString(test.hlvString)
require.NoError(t, err)

require.Equal(t, test.expectedHLV, *hlv, "HLV not parsed correctly for %s", test.hlvString)
Expand Down Expand Up @@ -930,7 +930,7 @@ func BenchmarkExtractHLVFromBlipMessage(b *testing.B) {
for _, bm := range getHLVTestCases(b) {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _, _ = ExtractHLVFromBlipMessage(bm.hlvString)
_, _, _ = extractHLVFromBlipString(bm.hlvString)
}
})
}
Expand Down Expand Up @@ -1398,15 +1398,15 @@ func TestAddVersion(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

hlv, _, err := ExtractHLVFromBlipMessage(tc.initialHLV)
hlv, _, err := extractHLVFromBlipString(tc.initialHLV)
require.NoError(t, err, "unable to parse initialHLV")
newVersion, err := ParseVersion(tc.newVersion)
require.NoError(t, err)

err = hlv.AddVersion(newVersion)
require.NoError(t, err)

expectedHLV, _, err := ExtractHLVFromBlipMessage(tc.expectedHLV)
expectedHLV, _, err := extractHLVFromBlipString(tc.expectedHLV)
require.NoError(t, err)
require.True(t, hlv.Equal(expectedHLV), "expected %#v does not match actual %#v", expectedHLV, hlv)

Expand Down
2 changes: 1 addition & 1 deletion db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (c *DatabaseCollectionWithUser) UpsertTestDocWithVersion(ctx context.Contex
versions, _, err := parseVectorValues(mergeVersionsStr)
require.NoError(t, err, "malformed mergeVersionsStr")
for _, version := range versions {
newDocHLV.setMergeVersion(version.SourceID, version.Value)
newDocHLV.SetMergeVersion(version.SourceID, version.Value)
}
}

Expand Down
56 changes: 56 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3569,3 +3569,59 @@ func TestBlipPushRevOnResurrection(t *testing.T) {
})
}
}

func TestBlipPullConflict(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeySync, base.KeySyncMsg, base.KeySGTest)
btcRunner := NewBlipTesterClientRunner(t)

btcRunner.SkipSubtest[RevtreeSubtestName] = true

btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTesterPersistentConfig(t)
defer rt.Close()

const (
username = "alice"
cblBody = `{"actor": "cbl"}`
sgBody = `{"actor": "sg"}`
)
rt.CreateUser(username, []string{"*"})
docID := "doc1"
sgVersion := rt.PutDocDirectly(docID, db.Body{"actor": "sg"})

opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols, Username: "alice"}
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts)
defer btc.Close()

client := btcRunner.SingleCollection(btc.id)
preConflictCBLVersion := btcRunner.AddRev(btc.id, docID, EmptyDocVersion(), []byte(cblBody))
require.NotEqual(t, sgVersion, preConflictCBLVersion)

btcRunner.StartOneshotPull(btc.id)

require.EventuallyWithT(t, func(c *assert.CollectT) {
_, _, postConflictCBLVersion := client.GetDoc(docID)
assert.NotEqual(t, preConflictCBLVersion, postConflictCBLVersion)
}, time.Second*10, time.Millisecond*10, "Expected sgVersion and cblVersion to be different")

postConflictDoc, postConflictHLV, postConflictVersion := client.GetDoc(docID)
require.Equal(t, cblBody, string(postConflictDoc))
// after resolving the conflict, the CBL version should remain the same but the ver of the CV is
// updated to be newer than the pre-conflict CBL version
require.Equal(t, preConflictCBLVersion.CV.SourceID, postConflictVersion.CV.SourceID)
require.Greater(t, postConflictVersion.CV.Value, preConflictCBLVersion.CV.Value, "PreConflictHLV %#v PostConflictHLV %#v", preConflictCBLVersion, postConflictHLV)
require.Empty(t, postConflictHLV.PreviousVersions, "postConflictHLV: %#+v\n", postConflictHLV)
require.Equal(t, db.HLVVersions{
sgVersion.CV.SourceID: sgVersion.CV.Value,
preConflictCBLVersion.CV.SourceID: preConflictCBLVersion.CV.Value,
}, postConflictHLV.MergeVersions)

btcRunner.StartPush(btc.id)
rt.WaitForVersion(docID, *postConflictVersion)

collection, ctx := rt.GetSingleTestDatabaseCollection()
bucketDoc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalAll)
require.NoError(t, err)
require.True(t, bucketDoc.HLV.Equal(postConflictHLV), "Expected bucket doc HLV to match post-conflict HLV, got %#v, expected %#v", bucketDoc.HLV, postConflictHLV)
})
}
Loading
Loading