Skip to content

Commit 682bccf

Browse files
committed
CBG-4790: reconcile rev tree on write for ISGR
1 parent 38f3742 commit 682bccf

File tree

9 files changed

+531
-112
lines changed

9 files changed

+531
-112
lines changed

db/blip_handler.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,9 +1094,6 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
10941094
var incomingHLV *HybridLogicalVector
10951095
// Build history/HLV
10961096
var legacyRevList []string
1097-
// we can probably use legacyRevList instead of this but to avoid hooking this up to write code we will use
1098-
// separate list for now, pending CBG-4790
1099-
var revTreeProperty []string
11001097
changeIsVector := strings.Contains(rev, "@")
11011098
if !bh.useHLV() || !changeIsVector {
11021099
newDoc.RevID = rev
@@ -1113,11 +1110,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
11131110
newDoc.HLV = incomingHLV
11141111
}
11151112

1113+
isBlipRevTreeProperty := false
11161114
// if the client is SGW and there are no legacy revs being sent (i.e. doc is not a pre-upgraded doc) check the rev tree property
11171115
if bh.clientType == BLIPClientTypeSGR2 && len(legacyRevList) == 0 {
11181116
revTree, ok := rq.Properties[RevMessageTreeHistory]
11191117
if ok {
1120-
revTreeProperty = append(revTreeProperty, strings.Split(revTree, ",")...) // nolint: all
1118+
legacyRevList = append(legacyRevList, strings.Split(revTree, ",")...)
1119+
isBlipRevTreeProperty = true
11211120
}
11221121
}
11231122

@@ -1341,7 +1340,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
13411340
// bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
13421341
forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
13431342
if bh.useHLV() && changeIsVector {
1344-
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc, legacyRevList)
1343+
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc, legacyRevList, isBlipRevTreeProperty)
13451344
} else if bh.conflictResolver != nil {
13461345
_, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
13471346
} else {

db/blip_sync_context.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
372372
if bsc.useDeltas && len(knownRevsArray) > 0 {
373373
if revID, ok := knownRevsArray[0].(string); ok {
374374
if versionVectorProtocol {
375-
// Is this full history?
375+
// extract cv from the known revs array
376376
msgHLV, _, err := extractHLVFromBlipString(revID)
377377
if err != nil {
378378
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID))
@@ -386,18 +386,28 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
386386
}
387387
}
388388

389+
var cvInKnownRevs bool
390+
389391
for _, rev := range knownRevsArray {
390392
if revID, ok := rev.(string); ok {
391-
// Is this full history?
393+
// extract cv from the known revs array
392394
msgHLV, _, err := extractHLVFromBlipString(revID)
393395
if err != nil {
394-
// assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication
396+
// assume we have received legacy rev if the following conditions are met:
397+
// - we cannot parse cv from known revs
398+
// - we are in vv enabled replication
399+
// - the first element of known rev array was NOT a CV
395400
if versionVectorProtocol {
396-
legacyRev = true
401+
if !cvInKnownRevs {
402+
// if cv wasn't the first element in known revs array, this will be legacy doc
403+
legacyRev = true
404+
}
397405
}
398406
} else {
399407
// extract cv as string
400408
revID = msgHLV.GetCurrentVersionString()
409+
// mark that we have seen a CV in known revs so we can mark as non legacy rev
410+
cvInKnownRevs = true
401411
}
402412
// we can assume here that if we fail to parse hlv, we have received a rev id in known revs. If we don't fail to parse hlv
403413
// then we have extracted cv from it and can assign the cv string to known revs here

db/crud.go

Lines changed: 105 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
12111211
return newRevID, doc, err
12121212
}
12131213

1214-
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument, revTreeHistory []string) (doc *Document, cv *Version, newRevID string, err error) {
1214+
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument, revTreeHistory []string, revTreeISGRProperty bool) (doc *Document, cv *Version, newRevID string, err error) {
12151215
var matchRev string
12161216
if existingDoc != nil {
12171217
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
@@ -1262,7 +1262,14 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
12621262
} else {
12631263
previousRevTreeID = revTreeHistory[0]
12641264
prevGeneration, _ = ParseRevID(ctx, previousRevTreeID)
1265-
newGeneration = prevGeneration + 1
1265+
// if incoming rev tree list is from a legacy pre upgraded doc, we should have new revID generation based
1266+
// off the previous current rev +1. If we have rev tree list filled from ISGR's rev tree property then we
1267+
// should use the current rev of inc
1268+
if !revTreeISGRProperty {
1269+
newGeneration = prevGeneration + 1
1270+
} else {
1271+
newGeneration = prevGeneration
1272+
}
12661273
}
12671274
revTreeConflictChecked := false
12681275
var parent string
@@ -1276,6 +1283,12 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
12761283
if addNewerVersionsErr != nil {
12771284
return nil, nil, false, nil, addNewerVersionsErr
12781285
}
1286+
if revTreeISGRProperty {
1287+
err = doc.alignRevTreeHistory(ctx, newDoc, revTreeHistory)
1288+
if err != nil {
1289+
return nil, nil, false, nil, err
1290+
}
1291+
}
12791292
} else {
12801293
if doc.HLV.isDominating(newDocHLV) {
12811294
base.DebugfCtx(ctx, base.KeyCRUD, "PutExistingCurrentVersion(%q): No new versions to add. existing: %#v new:%#v", base.UD(newDoc.ID), doc.HLV, newDocHLV)
@@ -1289,15 +1302,28 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
12891302
}
12901303
// the new document has a dominating hlv, so we can ignore any legacy rev revtree information on the incoming document
12911304
revTreeConflictChecked = true
1292-
previousRevTreeID = doc.CurrentRev
1305+
if !revTreeISGRProperty {
1306+
previousRevTreeID = doc.CurrentRev
1307+
} else {
1308+
// align rev tree here for ISGR replications
1309+
err = doc.alignRevTreeHistory(ctx, newDoc, revTreeHistory)
1310+
if err != nil {
1311+
return nil, nil, false, nil, err
1312+
}
1313+
}
12931314
} else {
1294-
if len(revTreeHistory) > 0 {
1315+
// if the legacy rev list is from ISGR property, we should not do a conflict check
1316+
if len(revTreeHistory) > 0 && !revTreeISGRProperty {
12951317
// conflict check on rev tree history, if there is a rev in rev tree history we have the parent of locally we are not in conflict
12961318
parent, currentRevIndex, err = db.revTreeConflictCheck(ctx, revTreeHistory, doc, newDoc.Deleted)
12971319
if err != nil {
12981320
base.InfofCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s, incoming version %s, local version %s, and conflict found in rev tree history", base.UD(doc.ID), newDocHLV.GetCurrentVersionString(), doc.HLV.GetCurrentVersionString())
12991321
return nil, nil, false, nil, err
13001322
}
1323+
err = doc.addNewerRevisionsToHistory(newDoc, currentRevIndex, parent, revTreeHistory)
1324+
if err != nil {
1325+
return nil, nil, false, nil, err
1326+
}
13011327
revTreeConflictChecked = true
13021328
addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV)
13031329
if addNewerVersionsErr != nil {
@@ -1317,24 +1343,15 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
13171343
// rev tree conflict check if we have rev tree history to check against + finds current rev index to allow us
13181344
// to add any new revision to rev tree below.
13191345
// Only check for rev tree conflicts if we haven't already checked above
1320-
if !revTreeConflictChecked && len(revTreeHistory) > 0 {
1346+
if !revTreeConflictChecked && len(revTreeHistory) > 0 && !revTreeISGRProperty {
13211347
parent, currentRevIndex, err = db.revTreeConflictCheck(ctx, revTreeHistory, doc, newDoc.Deleted)
13221348
if err != nil {
13231349
return nil, nil, false, nil, err
13241350
}
1325-
}
1326-
// Add all the new revisions to the rev tree:
1327-
for i := currentRevIndex - 1; i >= 0; i-- {
1328-
err := doc.History.addRevision(newDoc.ID,
1329-
RevInfo{
1330-
ID: revTreeHistory[i],
1331-
Parent: parent,
1332-
Deleted: i == 0 && newDoc.Deleted})
1333-
1351+
err = doc.addNewerRevisionsToHistory(newDoc, currentRevIndex, parent, revTreeHistory)
13341352
if err != nil {
13351353
return nil, nil, false, nil, err
13361354
}
1337-
parent = revTreeHistory[i]
13381355
}
13391356

13401357
// Process the attachments, replacing bodies with digests.
@@ -1344,16 +1361,23 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
13441361
}
13451362

13461363
// generate rev id for new arriving doc
1347-
strippedBody, _ := stripInternalProperties(newDoc._body)
1348-
encoding, err := base.JSONMarshalCanonical(strippedBody)
1349-
if err != nil {
1350-
return nil, nil, false, nil, err
1351-
}
1352-
newRev := CreateRevIDWithBytes(newGeneration, previousRevTreeID, encoding)
1353-
1354-
if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: previousRevTreeID, Deleted: newDoc.Deleted}); err != nil {
1355-
base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err)
1356-
return nil, nil, false, nil, base.ErrRevTreeAddRevFailure
1364+
var newRev string
1365+
if !revTreeISGRProperty {
1366+
// create a new revID for incoming write
1367+
strippedBody, _ := stripInternalProperties(newDoc._body)
1368+
encoding, err := base.JSONMarshalCanonical(strippedBody)
1369+
if err != nil {
1370+
return nil, nil, false, nil, err
1371+
}
1372+
newRev = CreateRevIDWithBytes(newGeneration, previousRevTreeID, encoding)
1373+
if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: previousRevTreeID, Deleted: newDoc.Deleted}); err != nil {
1374+
base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err)
1375+
return nil, nil, false, nil, base.ErrRevTreeAddRevFailure
1376+
}
1377+
} else {
1378+
// for ISGR, incoming writes current rev should be the most recent rev in history. This aligns the
1379+
// rev history each of the replication
1380+
newRev = previousRevTreeID
13571381
}
13581382

13591383
newDoc.RevID = newRev
@@ -3137,7 +3161,6 @@ func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, do
31373161
if strings.HasPrefix(docid, "_design/") && db.user != nil {
31383162
return // Users can't upload design docs, so ignore them
31393163
}
3140-
// todo: CBG-4782 utilise known revs for rev tree property in ISGR by returning know rev tree id's in possible list
31413164

31423165
doc, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalSync)
31433166
if err != nil {
@@ -3147,6 +3170,14 @@ func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, do
31473170
missing = append(missing, rev)
31483171
return
31493172
}
3173+
if doc.HLV == nil {
3174+
// no hlv on local doc, mark as missing but send current rev as known rev (will be handled as legacy
3175+
// rev document on changes response handler)
3176+
base.DebugfCtx(ctx, base.KeyChanges, "Doc %s has no HLV, marking change version %s as missing", base.UD(docid), base.UD(rev))
3177+
missing = append(missing, rev)
3178+
possible = append(possible, doc.CurrentRev)
3179+
return
3180+
}
31503181
// parse in coming version, if it's not known to local doc hlv then it is marked as missing, if it is and is a newer version
31513182
// then it is also marked as missing
31523183
cvValue, err := ParseVersion(rev)
@@ -3160,6 +3191,13 @@ func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, do
31603191
// incoming version is dominated by local doc hlv, so it is not missing
31613192
return
31623193
}
3194+
3195+
// return the local current rev as known rev, this will mean if you have rev 1,2,3 and remote has rev 1,2,3,4,5 then
3196+
// remote should only send rev 4,5 in rev tree property on the subsequent rev message for this document, we also need to
3197+
// send cv as first element for delta sync purposes
3198+
possible = append(possible, doc.HLV.GetCurrentVersionString())
3199+
possible = append(possible, doc.CurrentRev)
3200+
31633201
missing = append(missing, rev)
31643202
return
31653203
}
@@ -3328,6 +3366,47 @@ func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context,
33283366
}
33293367
}
33303368

3369+
func (doc *Document) alignRevTreeHistory(ctx context.Context, newDoc *Document, revTreeHistory []string) error {
3370+
currentRevIndex := len(revTreeHistory)
3371+
parent := ""
3372+
for i, revid := range revTreeHistory {
3373+
if doc.History.contains(revid) {
3374+
currentRevIndex = i
3375+
parent = revid
3376+
break
3377+
}
3378+
}
3379+
3380+
if parent != doc.CurrentRev {
3381+
base.DebugfCtx(ctx, base.KeyCRUD, "incoming rev tree history has different history than local doc %s, aligning local doc history with incoming rev tree history", base.UD(doc.ID))
3382+
// clean local history and make way for incoming history to replace it
3383+
doc.History = make(RevTree)
3384+
}
3385+
3386+
err := doc.addNewerRevisionsToHistory(newDoc, currentRevIndex, parent, revTreeHistory)
3387+
if err != nil {
3388+
return err
3389+
}
3390+
return nil
3391+
}
3392+
3393+
func (doc *Document) addNewerRevisionsToHistory(newDoc *Document, currentRevIndex int, parent string, docHistory []string) error {
3394+
for i := currentRevIndex - 1; i >= 0; i-- {
3395+
err := doc.History.addRevision(newDoc.ID,
3396+
RevInfo{
3397+
ID: docHistory[i],
3398+
Parent: parent,
3399+
Deleted: i == 0 && newDoc.Deleted})
3400+
3401+
if err != nil {
3402+
return err
3403+
}
3404+
parent = docHistory[i]
3405+
}
3406+
doc.CurrentRev = parent
3407+
return nil
3408+
}
3409+
33313410
const (
33323411
xattrMacroCas = "cas" // SyncData.Cas
33333412
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c

db/crud_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,7 +1773,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
17731773
PreviousVersions: pv,
17741774
}
17751775

1776-
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
1776+
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil, false)
17771777
assertHTTPError(t, err, 409)
17781778
require.Nil(t, doc)
17791779
require.Nil(t, cv)
@@ -1783,7 +1783,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
17831783
// TODO: because currentRev isn't being updated, storeOldBodyInRevTreeAndUpdateCurrent isn't
17841784
// updating the document body. Need to review whether it makes sense to keep using
17851785
// storeOldBodyInRevTreeAndUpdateCurrent, or if this needs a larger overhaul to support VV
1786-
doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
1786+
doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil, false)
17871787
require.NoError(t, err)
17881788
assert.Equal(t, "test", cv.SourceID)
17891789
assert.Equal(t, incomingVersion, cv.Value)
@@ -1805,7 +1805,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
18051805

18061806
// Attempt to push the same client update, validate server rejects as an already known version and cancels the update.
18071807
// This case doesn't return error, verify that SyncData hasn't been changed.
1808-
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
1808+
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil, false)
18091809
require.NoError(t, err)
18101810
syncData2, err := collection.GetDocSyncData(ctx, "doc1")
18111811
require.NoError(t, err)
@@ -1851,7 +1851,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
18511851
}
18521852

18531853
// assert that a conflict is correctly identified and the doc and cv are nil
1854-
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
1854+
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil, false)
18551855
assertHTTPError(t, err, 409)
18561856
require.Nil(t, doc)
18571857
require.Nil(t, cv)
@@ -1891,7 +1891,7 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
18911891
PreviousVersions: pv,
18921892
}
18931893
// call PutExistingCurrentVersion with empty existing doc
1894-
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{}, nil)
1894+
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{}, nil, false)
18951895
require.NoError(t, err)
18961896
assert.NotNil(t, doc)
18971897
// assert on returned CV value

db/import_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func TestOnDemandImportMou(t *testing.T) {
166166
case "PutExistingCurrentVersion":
167167
hlv := NewHybridLogicalVector()
168168
var legacyRevList []string
169-
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, hlv, rawBucketDoc, legacyRevList)
169+
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, hlv, rawBucketDoc, legacyRevList, false)
170170
assertHTTPError(t, err, 409)
171171
default:
172172
require.FailNow(t, fmt.Sprintf("unexpected funcName: %s", funcName))

db/util_testing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,7 @@ func (c *DatabaseCollectionWithUser) UpsertTestDocWithVersion(ctx context.Contex
882882
}
883883
}
884884

885-
doc, _, _, err := c.PutExistingCurrentVersion(ctx, newDoc, newDocHLV, currentBucketDoc, nil)
885+
doc, _, _, err := c.PutExistingCurrentVersion(ctx, newDoc, newDocHLV, currentBucketDoc, nil, false)
886886
require.NoError(t, err)
887887
return doc
888888
}

db/utilities_hlv_testing.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ func (v DocVersion) GoString() string {
3636
return fmt.Sprintf("DocVersion{RevTreeID:%s,CV:%#v}", v.RevTreeID, v.CV)
3737
}
3838

39-
// Can maybe be changed to check both rev and cv when CBG-4790, CBG-4791 are implemented
4039
func (v DocVersion) DocVersionRevTreeEqual(o DocVersion) bool {
4140
if v.RevTreeID != o.RevTreeID {
4241
return false

0 commit comments

Comments
 (0)