Skip to content

Commit 972d34b

Browse files
authored
CBG-4780: basic ISGR push and pull for 4.0 (#7668)
1 parent 14b0b7e commit 972d34b

File tree

9 files changed

+198
-117
lines changed

9 files changed

+198
-117
lines changed

db/active_replicator_common.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ func newActiveReplicatorCommon(ctx context.Context, config *ActiveReplicatorConf
123123
statusKey: metakeys.ReplicationStatusKey(checkpointID),
124124
direction: direction,
125125
}
126-
// CBG-4780: WIll hard code to use < 4 protocols for now, as the ISGR doesn't support 4+ protocols.
127-
arc.config.SupportedBLIPProtocols = []string{CBMobileReplicationV3.SubprotocolString(), CBMobileReplicationV2.SubprotocolString()}
128126

129127
if config.CollectionsEnabled {
130128
arc.namedCollections = make(map[base.ScopeAndCollectionName]*activeReplicatorCollection)

db/blip_handler.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -695,10 +695,23 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
695695
expectedSeqs := make(map[IDAndRev]SequenceID, 0)
696696
alreadyKnownSeqs := make([]SequenceID, 0)
697697

698+
versionVectorProtocol := bh.useHLV()
699+
698700
for _, change := range changeList {
699701
docID := change[1].(string)
700-
revID := change[2].(string)
701-
missing, possible := bh.collection.RevDiff(bh.loggingCtx, docID, []string{revID})
702+
rev := change[2].(string)
703+
var missing, possible []string
704+
705+
changeIsVector := false
706+
if versionVectorProtocol {
707+
changeIsVector = strings.Contains(rev, "@")
708+
}
709+
if !versionVectorProtocol || !changeIsVector {
710+
missing, possible = bh.collection.RevDiff(bh.loggingCtx, docID, []string{rev})
711+
} else {
712+
missing, possible = bh.collection.CheckChangeVersion(bh.loggingCtx, docID, rev)
713+
}
714+
702715
if nWritten > 0 {
703716
output.Write([]byte(","))
704717
}
@@ -740,7 +753,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
740753
if collectionCtx.sgr2PullAlreadyKnownSeqsCallback != nil {
741754
seq, err := ParseJSONSequenceID(seqStr(bh.loggingCtx, change[0]))
742755
if err != nil {
743-
base.WarnfCtx(bh.loggingCtx, "Unable to parse known sequence %q for %q / %q: %v", change[0], base.UD(docID), revID, err)
756+
base.WarnfCtx(bh.loggingCtx, "Unable to parse known sequence %q for %q / %q: %v", change[0], base.UD(docID), rev, err)
744757
} else {
745758
// we're not able to checkpoint a sequence we can't parse and aren't expecting so just skip the callback if we errored
746759
alreadyKnownSeqs = append(alreadyKnownSeqs, seq)
@@ -763,9 +776,9 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
763776
seq, err := ParseJSONSequenceID(seqStr(bh.loggingCtx, change[0]))
764777
if err != nil {
765778
// We've already asked for the doc/rev for the sequence so assume we're going to receive it... Just log this and carry on
766-
base.WarnfCtx(bh.loggingCtx, "Unable to parse expected sequence %q for %q / %q: %v", change[0], base.UD(docID), revID, err)
779+
base.WarnfCtx(bh.loggingCtx, "Unable to parse expected sequence %q for %q / %q: %v", change[0], base.UD(docID), rev, err)
767780
} else {
768-
expectedSeqs[IDAndRev{DocID: docID, RevID: revID}] = seq
781+
expectedSeqs[IDAndRev{DocID: docID, RevID: rev}] = seq
769782
}
770783
}
771784
}
@@ -1002,10 +1015,6 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
10021015
}
10031016
}
10041017

1005-
if bh.useHLV() && bh.conflictResolver != nil {
1006-
return base.HTTPErrorf(http.StatusNotImplemented, "conflict resolver handling (ISGR) not yet implemented for v4 protocol")
1007-
}
1008-
10091018
// throttle concurrent revs
10101019
if cap(bh.inFlightRevsThrottle) > 0 {
10111020
select {

db/crud.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3122,6 +3122,37 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un
31223122
return doc, rawDocument
31233123
}
31243124

3125+
func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, docid, rev string) (missing, possible []string) {
3126+
if strings.HasPrefix(docid, "_design/") && db.user != nil {
3127+
return // Users can't upload design docs, so ignore them
3128+
}
3129+
// todo: CBG-4782 utilise known revs for rev tree property in ISGR by returning know rev tree id's in possible list
3130+
3131+
doc, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalSync)
3132+
if err != nil {
3133+
if !base.IsDocNotFoundError(err) && !base.IsXattrNotFoundError(err) {
3134+
base.WarnfCtx(ctx, "Error fetching doc %s during changes handling: %v", base.UD(docid), err)
3135+
}
3136+
missing = append(missing, rev)
3137+
return
3138+
}
3139+
// parse in coming version, if it's not known to local doc hlv then it is marked as missing, if it is and is a newer version
3140+
// then it is also marked as missing
3141+
cvValue, err := ParseVersion(rev)
3142+
if err != nil {
3143+
base.WarnfCtx(ctx, "error parse change version for doc %s: %v", base.UD(docid), err)
3144+
missing = append(missing, rev)
3145+
return
3146+
}
3147+
// CBG-4792: enhance here for conflict check - return conflict rev similar to propose changes here link ticket
3148+
if doc.HLV.DominatesSource(cvValue) {
3149+
// incoming version is dominated by local doc hlv, so it is not missing
3150+
return
3151+
}
3152+
missing = append(missing, rev)
3153+
return
3154+
}
3155+
31253156
// ////// REVS_DIFF:
31263157

31273158
// Given a document ID and a set of revision IDs, looks up which ones are not known. Returns an

db/document.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,3 +1477,10 @@ func unmarshalRevSeqNo(revSeqNoBytes []byte) (uint64, error) {
14771477
}
14781478
return revSeqNo, nil
14791479
}
1480+
1481+
func (doc *Document) ExtractDocVersion() DocVersion {
1482+
return DocVersion{
1483+
RevTreeID: doc.CurrentRev,
1484+
CV: *doc.HLV.ExtractCurrentVersionFromHLV(),
1485+
}
1486+
}

db/utilities_hlv_testing.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ func (v DocVersion) GoString() string {
3636
return fmt.Sprintf("DocVersion{RevTreeID:%s,CV:%#v}", v.RevTreeID, v.CV)
3737
}
3838

39-
func (v DocVersion) Equal(o DocVersion) bool {
39+
// Can maybe be changed to check both rev and cv when CBG-4790, CBG-4791 are implemented
40+
func (v DocVersion) DocVersionRevTreeEqual(o DocVersion) bool {
4041
if v.RevTreeID != o.RevTreeID {
4142
return false
4243
}

rest/blip_api_crud_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2752,6 +2752,8 @@ func TestBlipInternalPropertiesHandling(t *testing.T) {
27522752
// the stat mapping (processRevStats)
27532753
func TestProcessRevIncrementsStat(t *testing.T) {
27542754
base.RequireNumTestBuckets(t, 2)
2755+
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)
2756+
t.Skip("CBG-4791 - rev tree generated on active is different from passive, this will be mitigated by CBG-4791")
27552757

27562758
activeRT, remoteRT, remoteURLString, teardown := SetupSGRPeers(t)
27572759
defer teardown()
@@ -2783,6 +2785,7 @@ func TestProcessRevIncrementsStat(t *testing.T) {
27832785
require.EqualValues(t, 0, pullStats.HandlePutRevCount.Value())
27842786

27852787
const docID = "doc"
2788+
// need to have this return CV too, pending CBG-4751
27862789
version := remoteRT.CreateTestDoc(docID)
27872790

27882791
assert.NoError(t, ar.Start(activeCtx))

0 commit comments

Comments
 (0)