Skip to content

Commit 335e7cf

Browse files
committed
CBG-4434 enable conflict tests
1 parent 367a62c commit 335e7cf

File tree

6 files changed

+153
-104
lines changed

6 files changed

+153
-104
lines changed

rest/utilities_testing_blip_client.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,9 @@ func (btcc *BlipTesterCollectionClient) GetDoc(docID string) ([]byte, *db.Hybrid
413413
if latestRev == nil {
414414
return nil, nil, nil
415415
}
416+
if latestRev.isDelete {
417+
return nil, &latestRev.HLV, &latestRev.version
418+
}
416419
return latestRev.body, &latestRev.HLV, &latestRev.version
417420
}
418421

@@ -491,13 +494,12 @@ func (btr *BlipTesterReplicator) Close() {
491494
}
492495

493496
// initHandlers sets up the blip client side handles for each message type.
494-
func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
497+
func (btr *BlipTesterReplicator) initHandlers(ctx context.Context, btc *BlipTesterClient) {
495498

496499
if btr.replicationStats == nil {
497500
btr.replicationStats = db.NewBlipSyncStats()
498501
}
499502

500-
ctx := base.DatabaseLogCtx(base.TestCtx(btr.bt.restTester.TB()), btr.bt.restTester.GetDatabase().Name, nil)
501503
btr.bt.blipContext.DefaultHandler = btr.defaultHandler()
502504
handlers := map[string]func(*blip.Message){
503505
db.MessageNoRev: btr.handleNoRev(ctx, btc),
@@ -960,7 +962,8 @@ func (btcc *BlipTesterCollectionClient) updateLastReplicatedRev(docID string, ve
960962
rev.message = msg
961963
}
962964

963-
func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
965+
// newBlipTesterReplication creates a new BlipTesterReplicator with the given id and BlipTesterClient. Used to instantiate a push or pull replication for the client.
966+
func newBlipTesterReplication(ctx context.Context, id string, btc *BlipTesterClient, skipCollectionsInitialization bool) *BlipTesterReplicator {
964967
bt := NewBlipTesterFromSpecWithRT(btc.rt, &BlipTesterSpec{
965968
connectingUsername: btc.Username,
966969
blipProtocols: btc.SupportedBLIPProtocols,
@@ -974,13 +977,13 @@ func newBlipTesterReplication(tb testing.TB, id string, btc *BlipTesterClient, s
974977
messages: make(map[blip.MessageNumber]*blip.Message),
975978
}
976979

977-
r.initHandlers(btc)
980+
r.initHandlers(ctx, btc)
978981

979982
return r
980983
}
981984

982985
// getCollectionsForBLIP returns collections configured by a single database instance on a restTester. If only default collection exists, it will skip returning it to test "legacy" blip mode.
983-
func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
986+
func getCollectionsForBLIP(rt *RestTester) []string {
984987
dbc := rt.GetDatabase()
985988
var collections []string
986989
for _, collection := range dbc.CollectionByID {
@@ -995,6 +998,10 @@ func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string {
995998
}
996999

9971000
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
1001+
return btcRunner.NewBlipTesterClientOptsWithRTAndContext(rt.Context(), rt, opts)
1002+
}
1003+
1004+
func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRTAndContext(ctx context.Context, rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) {
9981005
if opts == nil {
9991006
opts = &BlipTesterClientOpts{}
10001007
}
@@ -1017,7 +1024,7 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes
10171024
hlc: rosmar.NewHybridLogicalClock(0),
10181025
}
10191026
btcRunner.clients[client.id] = client
1020-
client.createBlipTesterReplications()
1027+
client.createBlipTesterReplications(ctx)
10211028

10221029
return client
10231030
}
@@ -1072,30 +1079,30 @@ func (btc *BlipTesterClient) tearDownBlipClientReplications() {
10721079
}
10731080

10741081
// createBlipTesterReplications creates the push and pull replications for the client.
1075-
func (btc *BlipTesterClient) createBlipTesterReplications() {
1082+
func (btc *BlipTesterClient) createBlipTesterReplications(ctx context.Context) {
10761083
id, err := uuid.NewRandom()
10771084
require.NoError(btc.TB(), err)
10781085

1079-
btc.pushReplication = newBlipTesterReplication(btc.TB(), "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1080-
btc.pullReplication = newBlipTesterReplication(btc.TB(), "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1086+
btc.pushReplication = newBlipTesterReplication(ctx, "push"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
1087+
btc.pullReplication = newBlipTesterReplication(ctx, "pull"+id.String(), btc, btc.BlipTesterClientOpts.SkipCollectionsInitialization)
10811088

1082-
collections := getCollectionsForBLIP(btc.TB(), btc.rt)
1089+
collections := getCollectionsForBLIP(btc.rt)
10831090
if !btc.BlipTesterClientOpts.SkipCollectionsInitialization && len(collections) > 0 {
10841091
btc.collectionClients = make([]*BlipTesterCollectionClient, len(collections))
10851092
for i, collection := range collections {
1086-
btc.initCollectionReplication(collection, i)
1093+
btc.initCollectionReplication(ctx, collection, i)
10871094
}
10881095
} else {
1089-
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(btc)
1096+
btc.nonCollectionAwareClient = NewBlipTesterCollectionClient(ctx, btc)
10901097
}
10911098

10921099
btc.pullReplication.bt.avoidRestTesterClose = true
10931100
btc.pushReplication.bt.avoidRestTesterClose = true
10941101
}
10951102

10961103
// initCollectionReplication initializes a BlipTesterCollectionClient for the given collection.
1097-
func (btc *BlipTesterClient) initCollectionReplication(collection string, collectionIdx int) {
1098-
btcReplicator := NewBlipTesterCollectionClient(btc)
1104+
func (btc *BlipTesterClient) initCollectionReplication(ctx context.Context, collection string, collectionIdx int) {
1105+
btcReplicator := NewBlipTesterCollectionClient(ctx, btc)
10991106
btcReplicator.collection = collection
11001107
btcReplicator.collectionIdx = collectionIdx
11011108
btc.collectionClients[collectionIdx] = btcReplicator
@@ -1466,8 +1473,8 @@ func (btcc *BlipTesterCollectionClient) UnsubPullChanges() {
14661473
}
14671474

14681475
// NewBlipTesterCollectionClient creates a collection specific client from a BlipTesterClient
1469-
func NewBlipTesterCollectionClient(btc *BlipTesterClient) *BlipTesterCollectionClient {
1470-
ctx, ctxCancel := context.WithCancel(btc.rt.Context())
1476+
func NewBlipTesterCollectionClient(ctx context.Context, btc *BlipTesterClient) *BlipTesterCollectionClient {
1477+
ctx, ctxCancel := context.WithCancel(ctx)
14711478
l := sync.RWMutex{}
14721479
c := &BlipTesterCollectionClient{
14731480
ctx: ctx,

topologytest/couchbase_lite_mock_peer_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,10 @@ func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicat
226226
}
227227
const username = "user"
228228
sg.rt.CreateUser(username, []string{"*"})
229-
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRT(sg.rt, &rest.BlipTesterClientOpts{
230-
Username: username,
229+
// intentionally do not use base.TestCtx to drop test name for readability
230+
ctx := base.CorrelationIDLogCtx(sg.rt.TB().Context(), p.name)
231+
replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRTAndContext(ctx, sg.rt, &rest.BlipTesterClientOpts{
232+
Username: "user",
231233
SupportedBLIPProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()},
232234
AllowCreationWithoutBlipTesterClientRunner: true,
233235
SourceID: p.SourceID(),

topologytest/hlv_test.go

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/couchbase/sync_gateway/base"
1717
"github.com/couchbase/sync_gateway/db"
18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -60,14 +61,85 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, doc
6061
}
6162

6263
// waitForCVAndBody waits for a document to reach a specific cv on all peers.
64+
// This is used when asserting on the full HLV is impossible. If XDCR is running, then asserting on the full HLV for
65+
// non CBL peers is possible. However, conflict resolution on Couchbase Lite means that Couchbase Lite can contain
66+
// previous versions of a document.
67+
//
68+
// See following example:
69+
// - - - - - - - + +- - - - - - -+
70+
// ' cluster A ' ' cluster B '
71+
// ' +---------+ ' ' +---------+ '
72+
// ' | cbs1 | ' <--> ' | cbs2 | '
73+
// ' +---------+ ' ' +---------+ '
74+
// ' +---------+ ' ' +---------+ '
75+
// ' | sg1 | ' ' | sg2 | '
76+
// ' +---------+ ' ' +---------+ '
77+
// - - - - - - - + +- - - - - - -+
78+
// ^ ^
79+
// | |
80+
// | |
81+
// v v
82+
// +---------+ +---------+
83+
// | cbl1 | | cbl2 |
84+
// +---------+ +---------+
85+
//
86+
// Couchbase Server, since conflict resolution in XDCR will overwrite the HLV.
87+
// 1. sg1 creates unique document cv: 1@rosmar1
88+
// 2. sg2 creates unique document cv: 2@rosmar2
89+
// 3. cbl1 pulls 1@rosmar1
90+
// 4. cbl2 pull 2@rosmar2
91+
// 5. cbs1 pulls 2@rosmar2, overwriting cv:1@rosmar1
92+
// 6. cbl1 pulls 2@rosmar2, creating cv: 2@rosmar2, pv:1@rosmar1 overwriting
93+
// Final state:
94+
// - cv:2@rosmar2 on cbs1, cbs2, cbl2
95+
// - cv:2@rosmar2, pv:1@rosmar1 on cbl1
6396
func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
6497
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
6598
for _, peer := range topology.SortedPeers() {
6699
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
67-
body := peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
100+
var body db.Body
101+
if peer.Type() == PeerTypeCouchbaseLite {
102+
body = peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
103+
} else {
104+
body = peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, topology)
105+
}
68106
requireBodyEqual(t, expectedVersion.body, body)
69107
}
70108
}
109+
110+
func waitForConvergingTombstones(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) {
111+
t.Logf("waiting for converging tombstones")
112+
require.EventuallyWithT(t, func(c *assert.CollectT) {
113+
nonCBLVersions := make(map[string]DocMetadata)
114+
CBLVersions := make(map[string]DocMetadata)
115+
for peerName, peer := range topology.SortedPeers() {
116+
meta, body, exists := peer.GetDocumentIfExists(dsName, docID)
117+
if !assert.True(c, exists, "doc %s does not exist on peer %s", docID, peer) {
118+
return
119+
}
120+
if !assert.Nil(c, body, "expected tombstone for doc %s on peer %s", docID, peer) {
121+
return
122+
}
123+
switch peer.Type() {
124+
case PeerTypeCouchbaseLite:
125+
CBLVersions[peerName] = meta
126+
default:
127+
nonCBLVersions[peerName] = meta
128+
}
129+
}
130+
var nonCBLVersion *DocMetadata
131+
for peer, version := range nonCBLVersions {
132+
if nonCBLVersion == nil {
133+
nonCBLVersion = &version
134+
continue
135+
}
136+
assertHLVEqual(c, dsName, docID, peer, version, nil, *nonCBLVersion, topology)
137+
}
138+
// TODO:: assert on CBL versions?
139+
}, totalWaitTime, pollInterval)
140+
}
141+
142+
// waitForTombstoneVersion waits for a tombstone document with a particular HLV to be present on all peers.
71143
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
72144
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
73145
for _, peer := range topology.SortedPeers() {
@@ -78,16 +150,11 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, d
78150

79151
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
80152
// returns the last peer to have a doc created on it
81-
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
153+
func createConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
82154
var documentVersion []BodyAndVersion
83155
for peerName, peer := range topology.peers.NonImportSortedPeers() {
84-
if peer.Type() == PeerTypeCouchbaseLite {
85-
// FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434
86-
continue
87-
}
88156
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topology.specDescription)
89157
docVersion := peer.CreateDocument(dsName, docID, []byte(docBody))
90-
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
91158
documentVersion = append(documentVersion, docVersion)
92159
}
93160
index := len(documentVersion) - 1
@@ -98,12 +165,11 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc
98165

99166
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
100167
// returns the last peer to have a doc updated on it.
101-
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
168+
func updateConflictingDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
102169
var documentVersion []BodyAndVersion
103170
for peerName, peer := range topology.peers.NonImportSortedPeers() {
104171
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topology.specDescription)
105172
docVersion := peer.WriteDocument(dsName, docID, []byte(docBody))
106-
t.Logf("updateVersion: %#v", docVersion.docMeta)
107173
documentVersion = append(documentVersion, docVersion)
108174
}
109175
index := len(documentVersion) - 1
@@ -114,11 +180,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, doc
114180

115181
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
116182
// returns the last peer to have a doc deleted on it
117-
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
183+
func deleteConflictDocs(dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
118184
var documentVersion []BodyAndVersion
119185
for peerName, peer := range topology.peers.NonImportSortedPeers() {
120186
deleteVersion := peer.DeleteDocument(dsName, docID)
121-
t.Logf("deleteVersion: %#v", deleteVersion)
122187
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
123188
}
124189
index := len(documentVersion) - 1

0 commit comments

Comments
 (0)