Skip to content

Commit a7c1ad8

Browse files
committed
CBG-4434 enable conflict tests
1 parent 38f3742 commit a7c1ad8

File tree

6 files changed

+154
-106
lines changed

6 files changed

+154
-106
lines changed

rest/utilities_testing_blip_client.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,9 @@ func (btcc *BlipTesterCollectionClient) GetDoc(docID string) ([]byte, *db.Hybrid
434434
if latestRev == nil {
435435
return nil, nil, nil
436436
}
437+
if latestRev.isDelete {
438+
return nil, &latestRev.HLV, &latestRev.version
439+
}
437440
return latestRev.body, &latestRev.HLV, &latestRev.version
438441
}
439442

@@ -512,13 +515,12 @@ func (btr *BlipTesterReplicator) Close() {
512515
}
513516

514517
// initHandlers sets up the blip client side handles for each message type.
515-
func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
518+
func (btr *BlipTesterReplicator) initHandlers(ctx context.Context, btc *BlipTesterClient) {
516519

517520
if btr.replicationStats == nil {
518521
btr.replicationStats = db.NewBlipSyncStats()
519522
}
520523

521-
ctx := base.DatabaseLogCtx(base.TestCtx(btr.bt.restTester.TB()), btr.bt.restTester.GetDatabase().Name, nil)
522524
btr.bt.blipContext.DefaultHandler = btr.defaultHandler()
523525
handlers := map[string]func(*blip.Message){
524526
db.MessageNoRev: btr.handleNoRev(ctx, btc),
@@ -981,30 +983,30 @@ func (btcc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, ve
981983
rev.message = msg
982984
}
983985

984-
func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
985-
bt, err := NewBlipTesterFromSpecWithRT(tb, &BlipTesterSpec{
986+
func newBlipTesterReplication(ctx context.Context, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
987+
bt, err := NewBlipTesterFromSpecWithRT(btc.rt.TB(), &BlipTesterSpec{
986988
connectingPassword: RestTesterDefaultUserPassword,
987989
connectingUsername: btc.Username,
988990
connectingUserChannelGrants: btc.Channels,
989991
blipProtocols: btc.SupportedBLIPProtocols,
990992
skipCollectionsInitialization: skipCollectionsInitialization,
991993
origin: btc.origin,
992994
}, btc.rt)
993-
require.NoError(tb, err)
995+
require.NoError(btc.rt.TB(), err)
994996

995997
r := &BlipTesterReplicator{
996998
id: id,
997999
bt: bt,
9981000
messages: make(map[blip.MessageNumber]*blip.Message),
9991001
}
10001002

1001-
r.initHandlers(btc)
1003+
r.initHandlers(ctx, btc)
10021004

10031005
return r
10041006
}
10051007

10061008
// getCollectionsForBLIP returns collections configured by a single database instance on a restTester. If only default collection exists, it will skip returning it to test "legacy" blip mode.
1007-
func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
1009+
func getCollectionsForBLIP(rt *RestTester) []string {
10081010
dbc := rt.GetDatabase()
10091011
var collections []string
10101012
for _, collection := range dbc.CollectionByID {
@@ -1018,7 +1020,11 @@ func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
10181020
return collections
10191021
}
10201022

1021-
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
1023+
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(ctx context.Context, rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
1024+
return btcRunner.NewBlipTesterClientOptsWithRT(rt.Context(), rt, opts)
1025+
}
1026+
1027+
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRTAndContext(ctx context.Context, rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
10221028
if opts == nil {
10231029
opts = &BlipTesterClientOpts{}
10241030
}
@@ -1041,7 +1047,7 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes
10411047
hlc: rosmar.NewHybridLogicalClock(0),
10421048
}
10431049
btcRunner.clients[client.id] = client
1044-
client.createBlipTesterReplications()
1050+
client.createBlipTesterReplications(ctx)
10451051

10461052
return client
10471053
}
@@ -1096,30 +1102,30 @@ func (btc *BlipTesterClient) tearDownBlipClientReplications() {
10961102
}
10971103

10981104
// createBlipTesterReplications creates the push and pull replications for the client.
1099-
func (btc *BlipTesterClient) createBlipTesterReplications() {
1105+
func (btc *BlipTesterClient) createBlipTesterReplications(ctx context.Context) {
11001106
id, err := uuid.NewRandom()
11011107
require.NoError(btc.TB(), err)
11021108

1103-
btc.pushReplication = newBlipTesterReplication(btc.TB(), "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1104-
btc.pullReplication = newBlipTesterReplication(btc.TB(), "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1109+
btc.pushReplication = newBlipTesterReplication(ctx, "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1110+
btc.pullReplication = newBlipTesterReplication(ctx, "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
11051111

1106-
collections := getCollectionsForBLIP(btc.TB(), btc.rt)
1112+
collections := getCollectionsForBLIP(btc.rt)
11071113
if !btc.BlipTesterClientOpts.SkipCollectionsInitialization && len(collections) > 0 {
11081114
btc.collectionClients = make([]*BlipTesterCollectionClient, len(collections))
11091115
for i, collection := range collections {
1110-
btc.initCollectionReplication(collection, i)
1116+
btc.initCollectionReplication(ctx, collection, i)
11111117
}
11121118
} else {
1113-
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(btc)
1119+
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(ctx, btc)
11141120
}
11151121

11161122
btc.pullReplication.bt.avoidRestTesterClose = true
11171123
btc.pushReplication.bt.avoidRestTesterClose = true
11181124
}
11191125

11201126
// initCollectionReplication initializes a BlipTesterCollectionClient for the given collection.
1121-
func (btc *BlipTesterClient) initCollectionReplication(collection string, collectionIdx int) {
1122-
btcReplicator := NewBlipTesterCollectionClient(btc)
1127+
func (btc *BlipTesterClient) initCollectionReplication(ctx context.Context, collection string, collectionIdx int) {
1128+
btcReplicator := NewBlipTesterCollectionClient(ctx, btc)
11231129
btcReplicator.collection = collection
11241130
btcReplicator.collectionIdx = collectionIdx
11251131
btc.collectionClients[collectionIdx] = btcReplicator
@@ -1490,8 +1496,8 @@ func (btcc *BlipTesterCollectionClient) UnsubPullChanges() {
14901496
}
14911497

14921498
// NewBlipTesterCollectionClient creates a collection specific client from a BlipTesterClient
1493-
func NewBlipTesterCollectionClient(btc *BlipTesterClient) *BlipTesterCollectionClient {
1494-
ctx, ctxCancel := context.WithCancel(btc.rt.Context())
1499+
func NewBlipTesterCollectionClient(ctx context.Context, btc *BlipTesterClient) *BlipTesterCollectionClient {
1500+
ctx, ctxCancel := context.WithCancel(ctx)
14951501
l := sync.RWMutex{}
14961502
c := &BlipTesterCollectionClient{
14971503
ctx: ctx,

topologytest/couchbase_lite_mock_peer_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicat
224224
btcRunner: rest.NewBlipTesterClientRunner(sg.rt.TB().(*testing.T)),
225225
direction: config.direction,
226226
}
227-
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRT(sg.rt, &rest.BlipTesterClientOpts{
227+
// intentionally do not use base.TestCtx to drop test name for readability
228+
ctx := base.CorrelationIDLogCtx(sg.rt.TB().Context(), p.name)
229+
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRTAndContext(ctx, sg.rt, &rest.BlipTesterClientOpts{
228230
Username: "user",
229231
Channels: []string{"*"},
230232
SupportedBLIPProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()},

topologytest/hlv_test.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/couchbase/sync_gateway/base"
1717
"github.com/couchbase/sync_gateway/db"
18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -60,14 +61,85 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, doc
6061
}
6162

6263
// waitForCVAndBody waits for a document to reach a specific cv on all peers.
64+
// This is used when asserting on the full HLV is impossible. If XDCR is running, then asserting on the full HLV for
65+
// non CBL peers is possible. However, conflict resolution on Couchbase Lite means that Couchbase Lite can contain
66+
// previous versions of a document.
67+
//
68+
// See following example:
69+
// - - - - - - - + +- - - - - - -+
70+
// ' cluster A ' ' cluster B '
71+
// ' +---------+ ' ' +---------+ '
72+
// ' | cbs1 | ' <--> ' | cbs2 | '
73+
// ' +---------+ ' ' +---------+ '
74+
// ' +---------+ ' ' +---------+ '
75+
// ' | sg1 | ' ' | sg2 | '
76+
// ' +---------+ ' ' +---------+ '
77+
// - - - - - - - + +- - - - - - -+
78+
// ^ ^
79+
// | |
80+
// | |
81+
// v v
82+
// +---------+ +---------+
83+
// | cbl1 | | cbl2 |
84+
// +---------+ +---------+
85+
//
86+
// Couchbase Server, since conflict resolution in XDCR will overwrite the HLV.
87+
// 1. sg1 creates unique document cv: 1@rosmar1
88+
// 2. sg2 creates unique document cv: 2@rosmar2
89+
// 3. cbl1 pulls 1@rosmar1
90+
// 4. cbl2 pull 2@rosmar2
91+
// 5. cbs1 pulls 2@rosmar2, overwriting cv:1@rosmar1
92+
// 6. cbl1 pulls 2@rosmar2, creating cv: 2@rosmar2, pv:1@rosmar1 overwriting
93+
// Final state:
94+
// - cv:2@rosmar2 on cbs1, cbs2, cbl2
95+
// - cv:2@rosmar2, pv:1@rosmar1 on cbl1
6396
func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
6497
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
6598
for _, peer := range topology.SortedPeers() {
6699
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
67-
body := peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
100+
var body db.Body
101+
if peer.Type() == PeerTypeCouchbaseLite {
102+
body = peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
103+
} else {
104+
body = peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, topology)
105+
}
68106
requireBodyEqual(t, expectedVersion.body, body)
69107
}
70108
}
109+
110+
func waitForConvergingTombstones(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) {
111+
t.Logf("waiting for converging tombstones")
112+
require.EventuallyWithT(t, func(c *assert.CollectT) {
113+
nonCBLVersions := make(map[string]DocMetadata)
114+
CBLVersions := make(map[string]DocMetadata)
115+
for peerName, peer := range topology.SortedPeers() {
116+
meta, body, exists := peer.GetDocumentIfExists(dsName, docID)
117+
if !assert.True(c, exists, "doc %s does not exist on peer %s", docID, peer) {
118+
return
119+
}
120+
if !assert.Nil(c, body, "expected tombstone for doc %s on peer %s", docID, peer) {
121+
return
122+
}
123+
switch peer.Type() {
124+
case PeerTypeCouchbaseLite:
125+
CBLVersions[peerName] = meta
126+
default:
127+
nonCBLVersions[peerName] = meta
128+
}
129+
}
130+
var nonCBLVersion *DocMetadata
131+
for peer, version := range nonCBLVersions {
132+
if nonCBLVersion == nil {
133+
nonCBLVersion = &version
134+
continue
135+
}
136+
assertHLVEqual(c, dsName, docID, peer, version, nil, *nonCBLVersion, topology)
137+
}
138+
// TODO:: assert on CBL versions?
139+
}, totalWaitTime, pollInterval)
140+
}
141+
142+
// waitForTombstoneVersion waits for a tombstone document with a particular HLV to be present on all peers.
71143
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
72144
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
73145
for _, peer := range topology.SortedPeers() {
@@ -78,16 +150,11 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, d
78150

79151
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
80152
// returns the last peer to have a doc created on it
81-
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
153+
func createConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
82154
var documentVersion []BodyAndVersion
83155
for peerName, peer := range topology.peers.NonImportSortedPeers() {
84-
if peer.Type() == PeerTypeCouchbaseLite {
85-
// FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434
86-
continue
87-
}
88156
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topology.specDescription)
89157
docVersion := peer.CreateDocument(dsName, docID, []byte(docBody))
90-
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
91158
documentVersion = append(documentVersion, docVersion)
92159
}
93160
index := len(documentVersion) - 1
@@ -98,12 +165,11 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc
98165

99166
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
100167
// returns the last peer to have a doc updated on it.
101-
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
168+
func updateConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
102169
var documentVersion []BodyAndVersion
103170
for peerName, peer := range topology.peers.NonImportSortedPeers() {
104171
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topology.specDescription)
105172
docVersion := peer.WriteDocument(dsName, docID, []byte(docBody))
106-
t.Logf("updateVersion: %#v", docVersion.docMeta)
107173
documentVersion = append(documentVersion, docVersion)
108174
}
109175
index := len(documentVersion) - 1
@@ -114,11 +180,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc
114180

115181
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
116182
// returns the last peer to have a doc deleted on it
117-
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
183+
func deleteConflictDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
118184
var documentVersion []BodyAndVersion
119185
for peerName, peer := range topology.peers.NonImportSortedPeers() {
120186
deleteVersion := peer.DeleteDocument(dsName, docID)
121-
t.Logf("deleteVersion: %#v", deleteVersion)
122187
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
123188
}
124189
index := len(documentVersion) - 1

0 commit comments

Comments
 (0)