Skip to content

Commit c48a4eb

Browse files
committed
CBG-4434 enable conflict tests
1 parent 367a62c commit c48a4eb

File tree

6 files changed

+190
-104
lines changed

6 files changed

+190
-104
lines changed

rest/utilities_testing_blip_client.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,9 @@ func (btcc *BlipTesterCollectionClient) GetDoc(docID string) ([]byte, *db.Hybrid
413413
if latestRev == nil {
414414
return nil, nil, nil
415415
}
416+
if latestRev.isDelete {
417+
return nil, &latestRev.HLV, &latestRev.version
418+
}
416419
return latestRev.body, &latestRev.HLV, &latestRev.version
417420
}
418421

@@ -491,13 +494,12 @@ func (btr *BlipTesterReplicator) Close() {
491494
}
492495

493496
// initHandlers sets up the blip client side handles for each message type.
494-
func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
497+
func (btr *BlipTesterReplicator) initHandlers(ctx context.Context, btc *BlipTesterClient) {
495498

496499
if btr.replicationStats == nil {
497500
btr.replicationStats = db.NewBlipSyncStats()
498501
}
499502

500-
ctx := base.DatabaseLogCtx(base.TestCtx(btr.bt.restTester.TB()), btr.bt.restTester.GetDatabase().Name, nil)
501503
btr.bt.blipContext.DefaultHandler = btr.defaultHandler()
502504
handlers := map[string]func(*blip.Message){
503505
db.MessageNoRev: btr.handleNoRev(ctx, btc),
@@ -960,7 +962,8 @@ func (btcc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, ve
960962
rev.message = msg
961963
}
962964

963-
func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
965+
// newBlipTesterReplication creates a new BlipTesterReplicator with the given id and BlipTesterClient. Used to instantiate a push or pull replication for the client.
966+
func newBlipTesterReplication(ctx context.Context, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
964967
bt := NewBlipTesterFromSpecWithRT(btc.rt, &BlipTesterSpec{
965968
connectingUsername: btc.Username,
966969
blipProtocols: btc.SupportedBLIPProtocols,
@@ -974,13 +977,13 @@ func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, s
974977
messages: make(map[blip.MessageNumber]*blip.Message),
975978
}
976979

977-
r.initHandlers(btc)
980+
r.initHandlers(ctx, btc)
978981

979982
return r
980983
}
981984

982985
// getCollectionsForBLIP returns collections configured by a single database instance on a restTester. If only default collection exists, it will skip returning it to test "legacy" blip mode.
983-
func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
986+
func getCollectionsForBLIP(rt *RestTester) []string {
984987
dbc := rt.GetDatabase()
985988
var collections []string
986989
for _, collection := range dbc.CollectionByID {
@@ -995,6 +998,10 @@ func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
995998
}
996999

9971000
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
1001+
return btcRunner.NewBlipTesterClientOptsWithRTAndContext(rt.Context(), rt, opts)
1002+
}
1003+
1004+
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRTAndContext(ctx context.Context, rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
9981005
if opts == nil {
9991006
opts = &BlipTesterClientOpts{}
10001007
}
@@ -1017,7 +1024,7 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes
10171024
hlc: rosmar.NewHybridLogicalClock(0),
10181025
}
10191026
btcRunner.clients[client.id] = client
1020-
client.createBlipTesterReplications()
1027+
client.createBlipTesterReplications(ctx)
10211028

10221029
return client
10231030
}
@@ -1072,30 +1079,30 @@ func (btc *BlipTesterClient) tearDownBlipClientReplications() {
10721079
}
10731080

10741081
// createBlipTesterReplications creates the push and pull replications for the client.
1075-
func (btc *BlipTesterClient) createBlipTesterReplications() {
1082+
func (btc *BlipTesterClient) createBlipTesterReplications(ctx context.Context) {
10761083
id, err := uuid.NewRandom()
10771084
require.NoError(btc.TB(), err)
10781085

1079-
btc.pushReplication = newBlipTesterReplication(btc.TB(), "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1080-
btc.pullReplication = newBlipTesterReplication(btc.TB(), "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1086+
btc.pushReplication = newBlipTesterReplication(ctx, "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1087+
btc.pullReplication = newBlipTesterReplication(ctx, "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
10811088

1082-
collections := getCollectionsForBLIP(btc.TB(), btc.rt)
1089+
collections := getCollectionsForBLIP(btc.rt)
10831090
if !btc.BlipTesterClientOpts.SkipCollectionsInitialization && len(collections) > 0 {
10841091
btc.collectionClients = make([]*BlipTesterCollectionClient, len(collections))
10851092
for i, collection := range collections {
1086-
btc.initCollectionReplication(collection, i)
1093+
btc.initCollectionReplication(ctx, collection, i)
10871094
}
10881095
} else {
1089-
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(btc)
1096+
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(ctx, btc)
10901097
}
10911098

10921099
btc.pullReplication.bt.avoidRestTesterClose = true
10931100
btc.pushReplication.bt.avoidRestTesterClose = true
10941101
}
10951102

10961103
// initCollectionReplication initializes a BlipTesterCollectionClient for the given collection.
1097-
func (btc *BlipTesterClient) initCollectionReplication(collection string, collectionIdx int) {
1098-
btcReplicator := NewBlipTesterCollectionClient(btc)
1104+
func (btc *BlipTesterClient) initCollectionReplication(ctx context.Context, collection string, collectionIdx int) {
1105+
btcReplicator := NewBlipTesterCollectionClient(ctx, btc)
10991106
btcReplicator.collection = collection
11001107
btcReplicator.collectionIdx = collectionIdx
11011108
btc.collectionClients[collectionIdx] = btcReplicator
@@ -1466,8 +1473,8 @@ func (btcc *BlipTesterCollectionClient) UnsubPullChanges() {
14661473
}
14671474

14681475
// NewBlipTesterCollectionClient creates a collection specific client from a BlipTesterClient
1469-
func NewBlipTesterCollectionClient(btc *BlipTesterClient) *BlipTesterCollectionClient {
1470-
ctx, ctxCancel := context.WithCancel(btc.rt.Context())
1476+
func NewBlipTesterCollectionClient(ctx context.Context, btc *BlipTesterClient) *BlipTesterCollectionClient {
1477+
ctx, ctxCancel := context.WithCancel(ctx)
14711478
l := sync.RWMutex{}
14721479
c := &BlipTesterCollectionClient{
14731480
ctx: ctx,

topologytest/couchbase_lite_mock_peer_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,10 @@ func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicat
226226
}
227227
const username = "user"
228228
sg.rt.CreateUser(username, []string{"*"})
229-
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRT(sg.rt, &rest.BlipTesterClientOpts{
230-
Username: username,
229+
// intentionally do not use base.TestCtx to drop test name for readability
230+
ctx := base.CorrelationIDLogCtx(sg.rt.TB().Context(), p.name)
231+
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRTAndContext(ctx, sg.rt, &rest.BlipTesterClientOpts{
232+
Username: "user",
231233
SupportedBLIPProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()},
232234
AllowCreationWithoutBlipTesterClientRunner: true,
233235
SourceID: p.SourceID(),

topologytest/hlv_test.go

Lines changed: 113 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/couchbase/sync_gateway/base"
1717
"github.com/couchbase/sync_gateway/db"
18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -60,14 +61,122 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, doc
6061
}
6162

6263
// waitForCVAndBody waits for a document to reach a specific cv on all peers.
64+
// This is used when asserting on the full HLV is impossible. If XDCR is running, then asserting on the full HLV for
65+
// non CBL peers is possible. However, conflict resolution on Couchbase Lite means that Couchbase Lite can contain
66+
// previous versions of a document.
67+
//
68+
// See following example:
69+
//
70+
// +- - - - - - -+ +- - - - - - -+
71+
// ' cluster A ' ' cluster B '
72+
// ' +---------+ ' ' +---------+ '
73+
// ' | cbs1 | ' <--> ' | cbs2 | '
74+
// ' +---------+ ' ' +---------+ '
75+
// ' +---------+ ' ' +---------+ '
76+
// ' | sg1 | ' ' | sg2 | '
77+
// ' +---------+ ' ' +---------+ '
78+
// +- - - - -- - + +- - - - - - -+
79+
// ^ ^
80+
// | |
81+
// | |
82+
// v v
83+
// +---------+ +---------+
84+
// | cbl1 | | cbl2 |
85+
// +---------+ +---------+
86+
//
87+
// Couchbase Server, since conflict resolution in XDCR will overwrite the HLV.
88+
// 1. sg1 creates unique document cv: 1@rosmar1
89+
// 2. sg2 creates unique document cv: 2@rosmar2
90+
// 3. cbl1 pulls 1@rosmar1
91+
// 4. cbl2 pull 2@rosmar2
92+
// 5. cbs1 pulls 2@rosmar2, overwriting cv:1@rosmar1
93+
// 6. cbl1 pulls 2@rosmar2, creating cv: 2@rosmar2, pv:1@rosmar1 overwriting
94+
// Final state:
95+
// - cv:2@rosmar2 on cbs1, cbs2, cbl2
96+
// - cv:2@rosmar2, pv:1@rosmar1 on cbl1
6397
func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
6498
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
6599
for _, peer := range topology.SortedPeers() {
66100
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
67-
body := peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
101+
var body db.Body
102+
if peer.Type() == PeerTypeCouchbaseLite {
103+
body = peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
104+
} else {
105+
body = peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, topology)
106+
}
68107
requireBodyEqual(t, expectedVersion.body, body)
69108
}
70109
}
110+
111+
// waitForConvergingTombstones waits for all peers to have a tombstone document for a given doc ID. A matching HLV import (
112+
// guaranteed for all Couchbase Server / Sync Gateway versions, but not CBL versions.
113+
//
114+
// See following example:
115+
//
116+
// +- - - - - - -+ +- - - - - - -+
117+
// ' cluster A ' ' cluster B '
118+
// ' +---------+ ' ' +---------+ '
119+
// ' | cbs1 | ' <--> ' | cbs2 | '
120+
// ' +---------+ ' ' +---------+ '
121+
// ' +---------+ ' ' +---------+ '
122+
// ' | sg1 | ' ' | sg2 | '
123+
// ' +---------+ ' ' +---------+ '
124+
// +- - - - - - -+ +- - - - - - -+
125+
// ^ ^
126+
// | |
127+
// | |
128+
// v v
129+
// +---------+ +---------+
130+
// | cbl1 | | cbl2 |
131+
// +---------+ +---------+
132+
//
133+
// There is a converging document + HLV on all peers.
134+
// 1. cbl1 deletes document cv: 5@cbl1
135+
// 2. cbl2 deletes document cv: 6@cbl2
136+
// 3. sg1 deletes document cv: 7@rosmar1
137+
// 4. sg2 deletes document cv: 8@rosmar2
138+
// 5. cbl2 pulls from sg2, creates 8@rosmar2;6@cbl2
139+
// 6. cbl1 pulls from sg1, creates 7@rosmar1;5@cbl1
140+
// 7. cbs1 pulls from cbs2, creating cv:8@rosmar2. This version isn't imported, so doesn't get recognized as needing
141+
// to replicate to Couchbase Lite.
142+
//
143+
// Final state:
144+
// - CBS1, CBS2: 8@rosmar2
145+
// - CBL1: 7@rosmar1;5@cbl1
146+
// - CBL2: 8@rosmar2;6@cbl2
147+
func waitForConvergingTombstones(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) {
148+
t.Logf("waiting for converging tombstones")
149+
require.EventuallyWithT(t, func(c *assert.CollectT) {
150+
nonCBLVersions := make(map[string]DocMetadata)
151+
CBLVersions := make(map[string]DocMetadata)
152+
for peerName, peer := range topology.SortedPeers() {
153+
meta, body, exists := peer.GetDocumentIfExists(dsName, docID)
154+
if !assert.True(c, exists, "doc %s does not exist on peer %s", docID, peer) {
155+
return
156+
}
157+
if !assert.Nil(c, body, "expected tombstone for doc %s on peer %s", docID, peer) {
158+
return
159+
}
160+
switch peer.Type() {
161+
case PeerTypeCouchbaseLite:
162+
CBLVersions[peerName] = meta
163+
default:
164+
nonCBLVersions[peerName] = meta
165+
}
166+
}
167+
var nonCBLVersion *DocMetadata
168+
for peer, version := range nonCBLVersions {
169+
if nonCBLVersion == nil {
170+
nonCBLVersion = &version
171+
continue
172+
}
173+
assertHLVEqual(c, dsName, docID, peer, version, nil, *nonCBLVersion, topology)
174+
}
175+
// Is there a way to do any assertion on the CBL tombstone versions?
176+
}, totalWaitTime, pollInterval)
177+
}
178+
179+
// waitForTombstoneVersion waits for a tombstone document with a particular HLV to be present on all peers.
71180
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
72181
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
73182
for _, peer := range topology.SortedPeers() {
@@ -78,16 +187,11 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, d
78187

79188
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
80189
// returns the last peer to have a doc created on it
81-
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
190+
func createConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
82191
var documentVersion []BodyAndVersion
83192
for peerName, peer := range topology.peers.NonImportSortedPeers() {
84-
if peer.Type() == PeerTypeCouchbaseLite {
85-
// FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434
86-
continue
87-
}
88193
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topology.specDescription)
89194
docVersion := peer.CreateDocument(dsName, docID, []byte(docBody))
90-
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
91195
documentVersion = append(documentVersion, docVersion)
92196
}
93197
index := len(documentVersion) - 1
@@ -98,12 +202,11 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc
98202

99203
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
100204
// returns the last peer to have a doc updated on it.
101-
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
205+
func updateConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
102206
var documentVersion []BodyAndVersion
103207
for peerName, peer := range topology.peers.NonImportSortedPeers() {
104208
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topology.specDescription)
105209
docVersion := peer.WriteDocument(dsName, docID, []byte(docBody))
106-
t.Logf("updateVersion: %#v", docVersion.docMeta)
107210
documentVersion = append(documentVersion, docVersion)
108211
}
109212
index := len(documentVersion) - 1
@@ -114,11 +217,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc
114217

115218
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
116219
// returns the last peer to have a doc deleted on it
117-
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
220+
func deleteConflictDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
118221
var documentVersion []BodyAndVersion
119222
for peerName, peer := range topology.peers.NonImportSortedPeers() {
120223
deleteVersion := peer.DeleteDocument(dsName, docID)
121-
t.Logf("deleteVersion: %#v", deleteVersion)
122224
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
123225
}
124226
index := len(documentVersion) - 1

0 commit comments

Comments
 (0)