Skip to content

CBG-4434 enable conflict tests #7688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions rest/utilities_testing_blip_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ func (btcc *BlipTesterCollectionClient) GetDoc(docID string) ([]byte, *db.Hybrid
if latestRev == nil {
return nil, nil, nil
}
if latestRev.isDelete {
return nil, &latestRev.HLV, &latestRev.version
}
return latestRev.body, &latestRev.HLV, &latestRev.version
}

Expand Down Expand Up @@ -491,13 +494,12 @@ func (btr *BlipTesterReplicator) Close() {
}

// initHandlers sets up the blip client side handles for each message type.
func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
func (btr *BlipTesterReplicator) initHandlers(ctx context.Context, btc *BlipTesterClient) {

if btr.replicationStats == nil {
btr.replicationStats = db.NewBlipSyncStats()
}

ctx := base.DatabaseLogCtx(base.TestCtx(btr.bt.restTester.TB()), btr.bt.restTester.GetDatabase().Name, nil)
btr.bt.blipContext.DefaultHandler = btr.defaultHandler()
handlers := map[string]func(*blip.Message){
db.MessageNoRev: btr.handleNoRev(ctx, btc),
Expand Down Expand Up @@ -960,7 +962,8 @@ func (btcc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, ve
rev.message = msg
}

func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
// newBlipTesterReplication creates a new BlipTesterReplicator with the given id and BlipTesterClient. Used to instantiate a push or pull replication for the client.
func newBlipTesterReplication(ctx context.Context, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
bt := NewBlipTesterFromSpecWithRT(btc.rt, &BlipTesterSpec{
connectingUsername: btc.Username,
blipProtocols: btc.SupportedBLIPProtocols,
Expand All @@ -974,13 +977,13 @@ func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, s
messages: make(map[blip.MessageNumber]*blip.Message),
}

r.initHandlers(btc)
r.initHandlers(ctx, btc)

return r
}

// getCollectionsForBLIP returns collections configured by a single database instance on a restTester. If only default collection exists, it will skip returning it to test "legacy" blip mode.
func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
func getCollectionsForBLIP(rt *RestTester) []string {
dbc := rt.GetDatabase()
var collections []string
for _, collection := range dbc.CollectionByID {
Expand All @@ -995,6 +998,10 @@ func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
}

func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
return btcRunner.NewBlipTesterClientOptsWithRTAndContext(rt.Context(), rt, opts)
}

func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRTAndContext(ctx context.Context, rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
if opts == nil {
opts = &BlipTesterClientOpts{}
}
Expand All @@ -1017,7 +1024,7 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes
hlc: rosmar.NewHybridLogicalClock(0),
}
btcRunner.clients[client.id] = client
client.createBlipTesterReplications()
client.createBlipTesterReplications(ctx)

return client
}
Expand Down Expand Up @@ -1072,30 +1079,30 @@ func (btc *BlipTesterClient) tearDownBlipClientReplications() {
}

// createBlipTesterReplications creates the push and pull replications for the client.
func (btc *BlipTesterClient) createBlipTesterReplications() {
func (btc *BlipTesterClient) createBlipTesterReplications(ctx context.Context) {
id, err := uuid.NewRandom()
require.NoError(btc.TB(), err)

btc.pushReplication = newBlipTesterReplication(btc.TB(), "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
btc.pullReplication = newBlipTesterReplication(btc.TB(), "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
btc.pushReplication = newBlipTesterReplication(ctx, "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
btc.pullReplication = newBlipTesterReplication(ctx, "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)

collections := getCollectionsForBLIP(btc.TB(), btc.rt)
collections := getCollectionsForBLIP(btc.rt)
if !btc.BlipTesterClientOpts.SkipCollectionsInitialization && len(collections) > 0 {
btc.collectionClients = make([]*BlipTesterCollectionClient, len(collections))
for i, collection := range collections {
btc.initCollectionReplication(collection, i)
btc.initCollectionReplication(ctx, collection, i)
}
} else {
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(btc)
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(ctx, btc)
}

btc.pullReplication.bt.avoidRestTesterClose = true
btc.pushReplication.bt.avoidRestTesterClose = true
}

// initCollectionReplication initializes a BlipTesterCollectionClient for the given collection.
func (btc *BlipTesterClient) initCollectionReplication(collection string, collectionIdx int) {
btcReplicator := NewBlipTesterCollectionClient(btc)
func (btc *BlipTesterClient) initCollectionReplication(ctx context.Context, collection string, collectionIdx int) {
btcReplicator := NewBlipTesterCollectionClient(ctx, btc)
btcReplicator.collection = collection
btcReplicator.collectionIdx = collectionIdx
btc.collectionClients[collectionIdx] = btcReplicator
Expand Down Expand Up @@ -1466,8 +1473,8 @@ func (btcc *BlipTesterCollectionClient) UnsubPullChanges() {
}

// NewBlipTesterCollectionClient creates a collection specific client from a BlipTesterClient
func NewBlipTesterCollectionClient(btc *BlipTesterClient) *BlipTesterCollectionClient {
ctx, ctxCancel := context.WithCancel(btc.rt.Context())
func NewBlipTesterCollectionClient(ctx context.Context, btc *BlipTesterClient) *BlipTesterCollectionClient {
ctx, ctxCancel := context.WithCancel(ctx)
l := sync.RWMutex{}
c := &BlipTesterCollectionClient{
ctx: ctx,
Expand Down
6 changes: 4 additions & 2 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,10 @@ func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicat
}
const username = "user"
sg.rt.CreateUser(username, []string{"*"})
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRT(sg.rt, &rest.BlipTesterClientOpts{
Username: username,
// intentionally do not use base.TestCtx to drop test name for readability
ctx := base.CorrelationIDLogCtx(sg.rt.TB().Context(), p.name)
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRTAndContext(ctx, sg.rt, &rest.BlipTesterClientOpts{
Username: "user",
SupportedBLIPProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()},
AllowCreationWithoutBlipTesterClientRunner: true,
SourceID: p.SourceID(),
Expand Down
125 changes: 114 additions & 11 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -60,14 +61,123 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, doc
}

// waitForCVAndBody waits for a document to reach a specific cv on all peers.
// This is used for scenarios where it's valid for the full HLV to not converge. This includes cases where
// CBL conflict resolution results in additional history in the CBL version of the HLV that may not be pushed
// to CBS (e.g. remote wins)
//
// See following example:
//
// +- - - - - - -+ +- - - - - - -+
// ' cluster A ' ' cluster B '
// ' +---------+ ' ' +---------+ '
// ' | cbs1 | ' <--> ' | cbs2 | '
// ' +---------+ ' ' +---------+ '
// ' +---------+ ' ' +---------+ '
// ' | sg1 | ' ' | sg2 | '
// ' +---------+ ' ' +---------+ '
// +- - - - -- - + +- - - - - - -+
// ^ ^
// | |
// | |
// v v
// +---------+ +---------+
// | cbl1 | | cbl2 |
// +---------+ +---------+
//
// Couchbase Server, since conflict resolution in XDCR will overwrite the HLV.
// 1. sg1 creates unique document cv: 1@rosmar1
// 2. sg2 creates unique document cv: 2@rosmar2
// 3. cbl1 pulls 1@rosmar1
// 4. cbl2 pull 2@rosmar2
// 5. cbs1 pulls 2@rosmar2, overwriting cv:1@rosmar1
// 6. cbl1 pulls 2@rosmar2, creating cv: 2@rosmar2, pv:1@rosmar1 overwriting
// Final state:
// - cv:2@rosmar2 on cbs1, cbs2, cbl2
// - cv:2@rosmar2, pv:1@rosmar1 on cbl1
func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
for _, peer := range topology.SortedPeers() {
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
body := peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
var body db.Body
if peer.Type() == PeerTypeCouchbaseLite {
body = peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
} else {
body = peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, topology)
}
requireBodyEqual(t, expectedVersion.body, body)
}
}

// waitForConvergingTombstones waits for all peers to have a tombstone document for a given doc ID. This is the
// equivalent function to waitForCVAndBody if the expected document is a tombstone.
//
// Couchbase Server and Sync Gateway peers will have matching HLVs due XDCR conflict resolution always overwriting
// HLVs. However, it is possible that XDCR will replicate a tombstone from one Couchbase Server to antoher Couchbase
// Server and update its HLV. Since tombstones are not imported by Sync Gateway, this CV will not be replicated to
// Couchbase Server.
//
// In this case, all peers will have a tombstone for this document, but no assertions can be made on Couchbase Lite
// peers. See following example:
//
// +- - - - - - -+ +- - - - - - -+
// ' cluster A ' ' cluster B '
// ' +---------+ ' ' +---------+ '
// ' | cbs1 | ' <--> ' | cbs2 | '
// ' +---------+ ' ' +---------+ '
// ' +---------+ ' ' +---------+ '
// ' | sg1 | ' ' | sg2 | '
// ' +---------+ ' ' +---------+ '
// +- - - - - - -+ +- - - - - - -+
// ^ ^
// | |
// | |
// v v
// +---------+ +---------+
// | cbl1 | | cbl2 |
// +---------+ +---------+
//
// There is a converging document + HLV on all peers.
// 1. cbl1 deletes document cv: 5@cbl1
// 2. cbl2 deletes document cv: 6@cbl2
// 3. sg1 deletes document cv: 7@rosmar1
// 4. sg2 deletes document cv: 8@rosmar2
// 5. cbl2 pulls from sg2, creates 8@rosmar2;6@cbl2
// 6. cbl1 pulls from sg1, creates 7@rosmar1;5@cbl1
// 7. cbs1 pulls from cbs2, creating cv:8@rosmar2. This version isn't imported, so doesn't get recognized as needing
// to replicate to Couchbase Lite.
//
// Final state:
// - CBS1, CBS2: 8@rosmar2
// - CBL1: 7@rosmar1;5@cbl1
// - CBL2: 8@rosmar2;6@cbl2
func waitForConvergingTombstones(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) {
t.Logf("waiting for converging tombstones")
require.EventuallyWithT(t, func(c *assert.CollectT) {
nonCBLVersions := make(map[string]DocMetadata)
for peerName, peer := range topology.SortedPeers() {
meta, body, exists := peer.GetDocumentIfExists(dsName, docID)
if !assert.True(c, exists, "doc %s does not exist on peer %s", docID, peer) {
return
}
if !assert.Nil(c, body, "expected tombstone for doc %s on peer %s", docID, peer) {
return
}
if peer.Type() != PeerTypeCouchbaseLite {
nonCBLVersions[peerName] = meta
}
}
var nonCBLVersion *DocMetadata
for peer, version := range nonCBLVersions {
if nonCBLVersion == nil {
nonCBLVersion = &version
continue
}
assertHLVEqual(c, dsName, docID, peer, version, nil, *nonCBLVersion, topology)
}
}, totalWaitTime, pollInterval)
}

// waitForTombstoneVersion waits for a tombstone document with a particular HLV to be present on all peers.
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
for _, peer := range topology.SortedPeers() {
Expand All @@ -78,16 +188,11 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, d

// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
// returns the last peer to have a doc created on it
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
func createConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
var documentVersion []BodyAndVersion
for peerName, peer := range topology.peers.NonImportSortedPeers() {
if peer.Type() == PeerTypeCouchbaseLite {
// FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434
continue
}
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topology.specDescription)
docVersion := peer.CreateDocument(dsName, docID, []byte(docBody))
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
documentVersion = append(documentVersion, docVersion)
}
index := len(documentVersion) - 1
Expand All @@ -98,12 +203,11 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc

// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
// returns the last peer to have a doc updated on it.
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
func updateConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
var documentVersion []BodyAndVersion
for peerName, peer := range topology.peers.NonImportSortedPeers() {
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topology.specDescription)
docVersion := peer.WriteDocument(dsName, docID, []byte(docBody))
t.Logf("updateVersion: %#v", docVersion.docMeta)
documentVersion = append(documentVersion, docVersion)
}
index := len(documentVersion) - 1
Expand All @@ -114,11 +218,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc

// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
// returns the last peer to have a doc deleted on it
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
func deleteConflictDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
var documentVersion []BodyAndVersion
for peerName, peer := range topology.peers.NonImportSortedPeers() {
deleteVersion := peer.DeleteDocument(dsName, docID)
t.Logf("deleteVersion: %#v", deleteVersion)
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
}
index := len(documentVersion) - 1
Expand Down
Loading
Loading