Skip to content

CBG-4780: basic ISGR push and pull for 4.0 #7668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions db/active_replicator_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func newActiveReplicatorCommon(ctx context.Context, config *ActiveReplicatorConf
statusKey: metakeys.ReplicationStatusKey(checkpointID),
direction: direction,
}
// CBG-4780: WIll hard code to use < 4 protocols for now, as the ISGR doesn't support 4+ protocols.
arc.config.SupportedBLIPProtocols = []string{CBMobileReplicationV3.SubprotocolString(), CBMobileReplicationV2.SubprotocolString()}

if config.CollectionsEnabled {
arc.namedCollections = make(map[base.ScopeAndCollectionName]*activeReplicatorCollection)
Expand Down
27 changes: 18 additions & 9 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,23 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
expectedSeqs := make(map[IDAndRev]SequenceID, 0)
alreadyKnownSeqs := make([]SequenceID, 0)

versionVectorProtocol := bh.useHLV()

for _, change := range changeList {
docID := change[1].(string)
revID := change[2].(string)
missing, possible := bh.collection.RevDiff(bh.loggingCtx, docID, []string{revID})
rev := change[2].(string)
var missing, possible []string

changeIsVector := false
if versionVectorProtocol {
changeIsVector = strings.Contains(rev, "@")
}
if !versionVectorProtocol || !changeIsVector {
missing, possible = bh.collection.RevDiff(bh.loggingCtx, docID, []string{rev})
} else {
missing, possible = bh.collection.CheckChangeVersion(bh.loggingCtx, docID, rev)
}

if nWritten > 0 {
output.Write([]byte(","))
}
Expand Down Expand Up @@ -740,7 +753,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
if collectionCtx.sgr2PullAlreadyKnownSeqsCallback != nil {
seq, err := ParseJSONSequenceID(seqStr(bh.loggingCtx, change[0]))
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse known sequence %q for %q / %q: %v", change[0], base.UD(docID), revID, err)
base.WarnfCtx(bh.loggingCtx, "Unable to parse known sequence %q for %q / %q: %v", change[0], base.UD(docID), rev, err)
Copy link
Preview

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of base.UD() to wrap the docID for user data redaction in logging.

Copilot uses AI. Check for mistakes.

} else {
// we're not able to checkpoint a sequence we can't parse and aren't expecting so just skip the callback if we errored
alreadyKnownSeqs = append(alreadyKnownSeqs, seq)
Expand All @@ -763,9 +776,9 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
seq, err := ParseJSONSequenceID(seqStr(bh.loggingCtx, change[0]))
if err != nil {
// We've already asked for the doc/rev for the sequence so assume we're going to receive it... Just log this and carry on
base.WarnfCtx(bh.loggingCtx, "Unable to parse expected sequence %q for %q / %q: %v", change[0], base.UD(docID), revID, err)
base.WarnfCtx(bh.loggingCtx, "Unable to parse expected sequence %q for %q / %q: %v", change[0], base.UD(docID), rev, err)
Copy link
Preview

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of base.UD() to wrap the docID for user data redaction in logging.

Copilot uses AI. Check for mistakes.

} else {
expectedSeqs[IDAndRev{DocID: docID, RevID: revID}] = seq
expectedSeqs[IDAndRev{DocID: docID, RevID: rev}] = seq
}
}
}
Expand Down Expand Up @@ -1002,10 +1015,6 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}
}

if bh.useHLV() && bh.conflictResolver != nil {
return base.HTTPErrorf(http.StatusNotImplemented, "conflict resolver handling (ISGR) not yet implemented for v4 protocol")
}

// throttle concurrent revs
if cap(bh.inFlightRevsThrottle) > 0 {
select {
Expand Down
31 changes: 31 additions & 0 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3122,6 +3122,37 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un
return doc, rawDocument
}

func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, docid, rev string) (missing, possible []string) {
if strings.HasPrefix(docid, "_design/") && db.user != nil {
return // Users can't upload design docs, so ignore them
}
// todo: CBG-4782 utilise known revs for rev tree property in ISGR by returning know rev tree id's in possible list

doc, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalSync)
if err != nil {
if !base.IsDocNotFoundError(err) && !base.IsXattrNotFoundError(err) {
base.WarnfCtx(ctx, "Error fetching doc %s during changes handling: %v", base.UD(docid), err)
}
missing = append(missing, rev)
return
}
// parse in coming version, if it's not known to local doc hlv then it is marked as missing, if it is and is a newer version
// then it is also marked as missing
cvValue, err := ParseVersion(rev)
if err != nil {
base.WarnfCtx(ctx, "error parse change version for doc %s: %v", base.UD(docid), err)
missing = append(missing, rev)
return
}
// CBG-4792: enhance here for conflict check - return conflict rev similar to propose changes here link ticket
if doc.HLV.DominatesSource(cvValue) {
// incoming version is dominated by local doc hlv, so it is not missing
return
}
missing = append(missing, rev)
return
}

// ////// REVS_DIFF:

// Given a document ID and a set of revision IDs, looks up which ones are not known. Returns an
Expand Down
7 changes: 7 additions & 0 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,3 +1477,10 @@ func unmarshalRevSeqNo(revSeqNoBytes []byte) (uint64, error) {
}
return revSeqNo, nil
}

func (doc *Document) ExtractDocVersion() DocVersion {
return DocVersion{
RevTreeID: doc.CurrentRev,
CV: *doc.HLV.ExtractCurrentVersionFromHLV(),
}
}
3 changes: 2 additions & 1 deletion db/utilities_hlv_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (v DocVersion) GoString() string {
return fmt.Sprintf("DocVersion{RevTreeID:%s,CV:%#v}", v.RevTreeID, v.CV)
}

func (v DocVersion) Equal(o DocVersion) bool {
// Can maybe be changed to check both rev and cv when CBG-4790, CBG-4791 are implemented
func (v DocVersion) DocVersionRevTreeEqual(o DocVersion) bool {
if v.RevTreeID != o.RevTreeID {
return false
}
Expand Down
3 changes: 3 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2752,6 +2752,8 @@ func TestBlipInternalPropertiesHandling(t *testing.T) {
// the stat mapping (processRevStats)
func TestProcessRevIncrementsStat(t *testing.T) {
base.RequireNumTestBuckets(t, 2)
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)
Copy link
Preview

Copilot AI Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug logging setup in tests should be removed before production. This appears to be dev-time logging that was left in.

Copilot uses AI. Check for mistakes.

t.Skip("CBG-4791 - rev tree generated on active is different from passive, this will be mitigated by CBG-4791")

activeRT, remoteRT, remoteURLString, teardown := SetupSGRPeers(t)
defer teardown()
Expand Down Expand Up @@ -2783,6 +2785,7 @@ func TestProcessRevIncrementsStat(t *testing.T) {
require.EqualValues(t, 0, pullStats.HandlePutRevCount.Value())

const docID = "doc"
// need to have this return CV too, pending CBG-4751
version := remoteRT.CreateTestDoc(docID)

assert.NoError(t, ar.Start(activeCtx))
Expand Down
Loading
Loading