Skip to content

Commit e3368ed

Browse files
authored
Print doc state on all peers on failure (#7670)
1 parent 6396718 commit e3368ed

10 files changed

+409
-215
lines changed

topologytest/couchbase_lite_mock_peer_test.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,17 @@ func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID
7070
return *meta, body
7171
}
7272

73+
// GetDocumentIfExists returns the latest version of a document.
74+
func (p *CouchbaseLiteMockPeer) GetDocumentIfExists(dsName sgbucket.DataStoreName, docID string) (m DocMetadata, body *db.Body, exists bool) {
75+
bodyBytes, meta := p.getLatestDocVersion(dsName, docID)
76+
if meta == nil {
77+
return DocMetadata{}, nil, false
78+
}
79+
require.NotNil(p.TB(), meta, "docID:%s not found on %s", docID, p)
80+
require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body))
81+
return *meta, body, true
82+
}
83+
7384
// getSingleSGBlipClient returns the single blip client for the peer. If there are multiple clients, or no clients it will fail the test. This is temporary to stub support for multiple Sync Gateway peers, see CBG-4433.
7485
func (p *CouchbaseLiteMockPeer) getSingleSGBlipClient() *PeerBlipTesterClient {
7586
// couchbase lite peer can't exist separately from sync gateway peer, CBG-4433
@@ -127,30 +138,46 @@ func (p *CouchbaseLiteMockPeer) DeleteDocument(dsName sgbucket.DataStoreName, do
127138
}
128139

129140
// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
130-
func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, replications Replications) db.Body {
141+
func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) db.Body {
142+
var data []byte
143+
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
144+
var actual *DocMetadata
145+
data, actual = p.getLatestDocVersion(dsName, docID)
146+
if !assert.NotNil(c, actual, "Could not find docID:%+v on %p\nVersion %#v", docID, p, expected) {
147+
return
148+
}
149+
assertHLVEqual(c, dsName, docID, p.name, *actual, data, expected, topology)
150+
}, totalWaitTime, pollInterval)
151+
var body db.Body
152+
require.NoError(p.TB(), base.JSONUnmarshal(data, &body))
153+
return body
154+
}
155+
156+
// WaitForCV waits for a document to reach a specific CV. Returns the state of the document at that version. The test will fail if the document does not reach the expected version in 20s.
157+
func (p *CouchbaseLiteMockPeer) WaitForCV(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) db.Body {
131158
var data []byte
132159
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
133160
var actual *DocMetadata
134161
data, actual = p.getLatestDocVersion(dsName, docID)
135162
if !assert.NotNil(c, actual, "Could not find docID:%+v on %p\nVersion %#v", docID, p, expected) {
136163
return
137164
}
138-
assertHLVEqual(c, docID, p.name, *actual, data, expected, replications)
165+
assertCVEqual(c, dsName, docID, p.name, *actual, data, expected, topology)
139166
}, totalWaitTime, pollInterval)
140167
var body db.Body
141168
require.NoError(p.TB(), base.JSONUnmarshal(data, &body))
142169
return body
143170
}
144171

145172
// WaitForTombstoneVersion waits for a document to reach a specific version, this must be a tombstone. The test will fail if the document does not reach the expected version in 20s.
146-
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, replications Replications) {
173+
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) {
147174
client := p.getSingleSGBlipClient().CollectionClient(dsName)
148175
expectedVersion := db.DocVersion{CV: expected.CV(p.TB())}
149176
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
150177
isTombstone, err := client.IsVersionTombstone(docID, expectedVersion)
151178
require.NoError(c, err)
152-
assert.True(c, isTombstone, "expected docID %s on peer %s to be deleted. Replications:\n%s", docID, p, replications.Stats())
153-
}, totalWaitTime, pollInterval)
179+
assert.True(c, isTombstone, "expected docID %s on peer %s to be deleted.", docID, p)
180+
}, totalWaitTime, pollInterval, topology.GetDocState(p.TB(), dsName, docID))
154181
}
155182

156183
// Close will shut down the peer and close any active replications on the peer.

topologytest/couchbase_server_peer_test.go

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ func (p *CouchbaseServerPeer) GetDocument(dsName sgbucket.DataStoreName, docID s
109109
return getBodyAndVersion(p, p.getCollection(dsName), docID)
110110
}
111111

112+
// GetDocument returns the latest version of a document. The test will fail the document does not exist.
113+
func (p *CouchbaseServerPeer) GetDocumentIfExists(dsName sgbucket.DataStoreName, docID string) (meta DocMetadata, body *db.Body, exists bool) {
114+
return getBodyAndVersionIfExists(p, p.getCollection(dsName), docID)
115+
}
116+
112117
// CreateDocument creates a document on the peer. The test will fail if the document already exists.
113118
func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion {
114119
// create document with xattrs to prevent XDCR from doing a round trip replication in this scenario:
@@ -177,21 +182,29 @@ func (p *CouchbaseServerPeer) DeleteDocument(dsName sgbucket.DataStoreName, docI
177182
}
178183

179184
// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
180-
func (p *CouchbaseServerPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, replications Replications) db.Body {
181-
docBytes := p.waitForDocVersion(dsName, docID, expected, replications)
185+
func (p *CouchbaseServerPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) db.Body {
186+
docBytes := p.waitForDocVersion(dsName, docID, expected, topology)
187+
var body db.Body
188+
require.NoError(p.TB(), base.JSONUnmarshal(docBytes, &body), "couldn't unmarshal docID %s: %s", docID, docBytes)
189+
return body
190+
}
191+
192+
// WaitForCV waits for a document to reach a specific CV. The test will fail if the document does not reach the expected version in 20s.
193+
func (p *CouchbaseServerPeer) WaitForCV(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) db.Body {
194+
docBytes := p.waitForCV(dsName, docID, expected, topology)
182195
var body db.Body
183196
require.NoError(p.TB(), base.JSONUnmarshal(docBytes, &body), "couldn't unmarshal docID %s: %s", docID, docBytes)
184197
return body
185198
}
186199

187200
// WaitForTombstoneVersion waits for a document to reach a specific version, this must be a tombstone. The test will fail if the document does not reach the expected version in 20s.
188-
func (p *CouchbaseServerPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, replications Replications) {
189-
docBytes := p.waitForDocVersion(dsName, docID, expected, replications)
190-
require.Empty(p.TB(), docBytes, "expected tombstone for docID %s, got %s. Replications:\n%s", docID, docBytes, replications.Stats())
201+
func (p *CouchbaseServerPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) {
202+
docBytes := p.waitForDocVersion(dsName, docID, expected, topology)
203+
require.Empty(p.TB(), docBytes, "expected tombstone for docID %s, got %s. %s", docID, docBytes, topology.GetDocState(p.TB(), dsName, docID))
191204
}
192205

193206
// waitForDocVersion waits for a document to reach a specific version and returns the body in bytes. The bytes will be nil if the document is a tombstone. The test will fail if the document does not reach the expected version in 20s.
194-
func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, replications Replications) []byte {
207+
func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) []byte {
195208
var docBytes []byte
196209
var version DocMetadata
197210
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
@@ -203,7 +216,25 @@ func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, d
203216
return
204217
}
205218
version = getDocVersion(docID, p, cas, xattrs)
206-
assertHLVEqual(c, docID, p.name, version, docBytes, expected, replications)
219+
assertHLVEqual(c, dsName, docID, p.name, version, docBytes, expected, topology)
220+
}, totalWaitTime, pollInterval)
221+
return docBytes
222+
}
223+
224+
// waitForCV waits for a document to reach a specific CV and returns the body in bytes. The bytes will be nil if the document is a tombstone. The test will fail if the document does not reach the expected version in 20s.
225+
func (p *CouchbaseServerPeer) waitForCV(dsName sgbucket.DataStoreName, docID string, expected DocMetadata, topology Topology) []byte {
226+
var docBytes []byte
227+
var version DocMetadata
228+
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
229+
var err error
230+
var xattrs map[string][]byte
231+
var cas uint64
232+
docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, metadataXattrNames)
233+
if !assert.NoError(c, err) {
234+
return
235+
}
236+
version = getDocVersion(docID, p, cas, xattrs)
237+
assertCVEqual(c, dsName, docID, p.name, version, docBytes, expected, topology)
207238
}, totalWaitTime, pollInterval)
208239
return docBytes
209240
}
@@ -346,3 +377,20 @@ func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) (
346377
require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body))
347378
return getDocVersion(docID, peer, cas, xattrs), body
348379
}
380+
381+
// getBodyAndVersionIfExists returns the body and version of a document from a sgbucket.DataStore.
382+
func getBodyAndVersionIfExists(peer Peer, collection sgbucket.DataStore, docID string) (meta DocMetadata, body *db.Body, exists bool) {
383+
docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, metadataXattrNames)
384+
if base.IsDocNotFoundError(err) {
385+
return DocMetadata{}, nil, false
386+
}
387+
require.NoError(peer.TB(), err)
388+
meta = getDocVersion(docID, peer, cas, xattrs)
389+
if len(docBytes) == 0 {
390+
// document is a tombstone, return empty body
391+
return meta, nil, true
392+
}
393+
// get hlv to construct DocVersion
394+
require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body))
395+
return meta, body, true
396+
}

topologytest/hlv_test.go

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -50,53 +50,43 @@ func stripInternalProperties(body db.Body) {
5050
}
5151

5252
// waitForVersionAndBody waits for a document to reach a specific version on all peers.
53-
func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, replications Replications, docID string, expectedVersion BodyAndVersion) {
53+
func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
5454
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
55-
for _, peer := range peers.SortedPeers() {
55+
for _, peer := range topology.SortedPeers() {
5656
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
57-
body := peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, replications)
57+
body := peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta, topology)
5858
requireBodyEqual(t, expectedVersion.body, body)
5959
}
6060
}
6161

62-
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, replications Replications, docID string, expectedVersion BodyAndVersion) {
63-
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
64-
for _, peer := range peers.SortedPeers() {
65-
t.Logf("waiting for tombstone version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
66-
peer.WaitForTombstoneVersion(dsName, docID, expectedVersion.docMeta, replications)
62+
// waitForCVAndBody waits for a document to reach a specific cv on all peers.
63+
func waitForCVAndBody(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
64+
t.Logf("waiting for doc version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
65+
for _, peer := range topology.SortedPeers() {
66+
t.Logf("waiting for doc version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
67+
body := peer.WaitForCV(dsName, docID, expectedVersion.docMeta, topology)
68+
requireBodyEqual(t, expectedVersion.body, body)
6769
}
6870
}
69-
70-
// removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS
71-
// so we can skip creating docs on these peers (avoiding conflicts between docs created on the SGW and cbs)
72-
func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool {
73-
peersToRemove := make(map[string]bool)
74-
if peers["sg1"] != nil {
75-
// remove the backing store from doc update cycle to avoid conflicts on creating the document in bucket
76-
peersToRemove["cbs1"] = true
77-
}
78-
if peers["sg2"] != nil {
79-
// remove the backing store from doc update cycle to avoid conflicts on creating the document in bucket
80-
peersToRemove["cbs2"] = true
71+
func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, docID string, expectedVersion BodyAndVersion, topology Topology) {
72+
t.Logf("waiting for tombstone version on all peers, written from %s: %#v", expectedVersion.updatePeer, expectedVersion)
73+
for _, peer := range topology.SortedPeers() {
74+
t.Logf("waiting for tombstone version on peer %s, written from %s: %#v", peer, expectedVersion.updatePeer, expectedVersion)
75+
peer.WaitForTombstoneVersion(dsName, docID, expectedVersion.docMeta, topology)
8176
}
82-
return peersToRemove
8377
}
8478

8579
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
8680
// returns the last peer to have a doc created on it
87-
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) {
88-
backingPeers := removeSyncGatewayBackingPeers(peers)
89-
documentVersion := make([]BodyAndVersion, 0, len(peers))
90-
for peerName, peer := range peers {
91-
if backingPeers[peerName] {
92-
continue
93-
}
81+
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
82+
var documentVersion []BodyAndVersion
83+
for peerName, peer := range topology.peers.NonImportSortedPeers() {
9484
if peer.Type() == PeerTypeCouchbaseLite {
9585
// FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434
9686
continue
9787
}
98-
docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topologyDescription))
99-
docVersion := peer.CreateDocument(dsName, docID, docBody)
88+
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topology.specDescription)
89+
docVersion := peer.CreateDocument(dsName, docID, []byte(docBody))
10090
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
10191
documentVersion = append(documentVersion, docVersion)
10292
}
@@ -108,15 +98,11 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee
10898

10999
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
110100
// returns the last peer to have a doc updated on it.
111-
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) {
112-
backingPeers := removeSyncGatewayBackingPeers(peers)
101+
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
113102
var documentVersion []BodyAndVersion
114-
for peerName, peer := range peers {
115-
if backingPeers[peerName] {
116-
continue
117-
}
118-
docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topologyDescription))
119-
docVersion := peer.WriteDocument(dsName, docID, docBody)
103+
for peerName, peer := range topology.peers.NonImportSortedPeers() {
104+
docBody := fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topology.specDescription)
105+
docVersion := peer.WriteDocument(dsName, docID, []byte(docBody))
120106
t.Logf("updateVersion: %#v", docVersion.docMeta)
121107
documentVersion = append(documentVersion, docVersion)
122108
}
@@ -128,13 +114,9 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee
128114

129115
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
130116
// returns the last peer to have a doc deleted on it
131-
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) (lastWrite BodyAndVersion) {
132-
backingPeers := removeSyncGatewayBackingPeers(peers)
117+
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, docID string, topology Topology) (lastWrite BodyAndVersion) {
133118
var documentVersion []BodyAndVersion
134-
for peerName, peer := range peers {
135-
if backingPeers[peerName] {
136-
continue
137-
}
119+
for peerName, peer := range topology.peers.NonImportSortedPeers() {
138120
deleteVersion := peer.DeleteDocument(dsName, docID)
139121
t.Logf("deleteVersion: %#v", deleteVersion)
140122
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})

0 commit comments

Comments
 (0)