Skip to content

[3.2.1 Backport] CBG-4151: Memory-based rev cache size #7134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
StatAddedVersion3dot1dot3dot1 = "3.1.3.1"
StatAddedVersion3dot1dot4 = "3.1.4"
StatAddedVersion3dot2dot0 = "3.2.0"
StatAddedVersion3dot2dot1 = "3.2.1"
StatAddedVersion3dot3dot0 = "3.3.0"

StatDeprecatedVersionNotDeprecated = ""
Expand Down Expand Up @@ -425,12 +426,16 @@ type CacheStats struct {
NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"`
// The total number of pending sequences. These are out-of-sequence entries waiting to be cached.
PendingSeqLen *SgwIntStat `json:"pending_seq_len"`
// Total number of items in the rev cache
RevisionCacheNumItems *SgwIntStat `json:"revision_cache_num_items"`
// The total number of revision cache bypass operations performed.
RevisionCacheBypass *SgwIntStat `json:"rev_cache_bypass"`
// The total number of revision cache hits.
RevisionCacheHits *SgwIntStat `json:"rev_cache_hits"`
// The total number of revision cache misses.
RevisionCacheMisses *SgwIntStat `json:"rev_cache_misses"`
// Total memory used by the rev cache
RevisionCacheTotalMemory *SgwIntStat `json:"revision_cache_total_memory"`
// The current length of the pending skipped sequence slice.
SkippedSeqLen *SgwIntStat `json:"skipped_seq_len"`
// The current capacity of the skipped sequence slice
Expand Down Expand Up @@ -1320,6 +1325,10 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.RevisionCacheNumItems, err = NewIntStat(SubsystemCacheKey, "revision_cache_num_items", StatUnitNoUnits, RevCacheNumItemsDesc, StatAddedVersion3dot2dot1, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.RevisionCacheBypass, err = NewIntStat(SubsystemCacheKey, "rev_cache_bypass", StatUnitNoUnits, RevCacheBypassDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
Expand All @@ -1332,6 +1341,10 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.RevisionCacheTotalMemory, err = NewIntStat(SubsystemCacheKey, "revision_cache_total_memory", StatUnitNoUnits, RevCacheMemoryDesc, StatAddedVersion3dot2dot1, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.SkippedSeqLen, err = NewIntStat(SubsystemCacheKey, "skipped_seq_len", StatUnitNoUnits, SkippedSeqLengthDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
Expand Down Expand Up @@ -1377,9 +1390,11 @@ func (d *DbStats) unregisterCacheStats() {
prometheus.Unregister(d.CacheStats.SkippedSeqCap)
prometheus.Unregister(d.CacheStats.NumCurrentSeqsSkipped)
prometheus.Unregister(d.CacheStats.PendingSeqLen)
prometheus.Unregister(d.CacheStats.RevisionCacheNumItems)
prometheus.Unregister(d.CacheStats.RevisionCacheBypass)
prometheus.Unregister(d.CacheStats.RevisionCacheHits)
prometheus.Unregister(d.CacheStats.RevisionCacheMisses)
prometheus.Unregister(d.CacheStats.RevisionCacheTotalMemory)
prometheus.Unregister(d.CacheStats.SkippedSeqLen)
prometheus.Unregister(d.CacheStats.ViewQueries)
}
Expand Down
4 changes: 4 additions & 0 deletions base/stats_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ const (

PendingSeqLengthDesc = "The total number of pending sequences. These are out-of-sequence entries waiting to be cached."

RevCacheNumItemsDesc = "The total number of items in the revision cache."

RevCacheBypassDesc = "The total number of revision cache bypass operations performed."

RevCacheHitsDesc = "The total number of revision cache hits. This metric can be used to calculate the ratio of revision cache hits: " +
Expand All @@ -127,6 +129,8 @@ const (
RevCacheMissesDesc = "The total number of revision cache misses. This metric can be used to calculate the ratio of revision cache misses: " +
"Rev Cache Miss Ratio = rev_cache_misses / (rev_cache_hits + rev_cache_misses)"

RevCacheMemoryDesc = "The approximation of total memory taken up by rev cache for documents. This is measured by the raw document body, the channels allocated to a document and its revision history."

SkippedSeqLengthDesc = "The current length of the pending skipped sequence slice."

SkippedSeqCapDesc = "The current capacity of the skipped sequence slice."
Expand Down
4 changes: 2 additions & 2 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e
}

func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx context.Context, entry *ChangeEntry, revID string) (err error) {
rev, err := db.getRev(ctx, entry.ID, revID, 0, nil, RevCacheIncludeBody)
rev, err := db.getRev(ctx, entry.ID, revID, 0, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func (db *DatabaseCollectionWithUser) buildRevokedFeed(ctx context.Context, ch c

// UserHasDocAccess checks whether the user has access to the active revision of the document
func UserHasDocAccess(ctx context.Context, collection *DatabaseCollectionWithUser, docID string) (bool, error) {
currentRev, err := collection.revisionCache.GetActive(ctx, docID, false)
currentRev, err := collection.revisionCache.GetActive(ctx, docID)
if err != nil {
if base.IsDocNotFoundError(err) {
return false, nil
Expand Down
51 changes: 24 additions & 27 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (db *DatabaseCollectionWithUser) GetRev(ctx context.Context, docID, revID s
if history {
maxHistory = math.MaxInt32
}
return db.getRev(ctx, docID, revID, maxHistory, nil, RevCacheOmitBody)
return db.getRev(ctx, docID, revID, maxHistory, nil)
}

// Returns the body of the current revision of a document
Expand All @@ -278,7 +278,7 @@ func (db *DatabaseCollectionWithUser) Get1xRevBody(ctx context.Context, docid, r

// Retrieves rev with request history specified as collection of revids (historyFrom)
func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string, attachmentsSince []string, showExp bool) (Body, error) {
rev, err := db.getRev(ctx, docid, revid, maxHistory, historyFrom, RevCacheIncludeBody)
rev, err := db.getRev(ctx, docid, revid, maxHistory, historyFrom)
if err != nil {
return nil, err
}
Expand All @@ -305,14 +305,14 @@ func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Contex
// - attachmentsSince is nil to return no attachment bodies, otherwise a (possibly empty) list of
// revisions for which the client already has attachments and doesn't need bodies. Any attachment
// that hasn't changed since one of those revisions will be returned as a stub.
func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string, includeBody bool) (revision DocumentRevision, err error) {
func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string) (revision DocumentRevision, err error) {
if revid != "" {
// Get a specific revision body and history from the revision cache
// (which will load them if necessary, by calling revCacheLoader, above)
revision, err = db.revisionCache.Get(ctx, docid, revid, includeBody, RevCacheOmitDelta)
revision, err = db.revisionCache.Get(ctx, docid, revid, RevCacheOmitDelta)
} else {
// No rev ID given, so load active revision
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
revision, err = db.revisionCache.GetActive(ctx, docid)
}

if err != nil {
Expand Down Expand Up @@ -373,7 +373,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return nil, nil, nil
}

fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheOmitBody, RevCacheIncludeDelta)
fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta)

// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
// return 404 missing to indicate that the body of the revision is no longer available.
Expand Down Expand Up @@ -413,7 +413,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR

// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheMisses, 1)
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
toRevision, err := db.revisionCache.Get(ctx, docID, toRevID, RevCacheOmitBody, RevCacheIncludeDelta)
toRevision, err := db.revisionCache.Get(ctx, docID, toRevID, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -549,38 +549,36 @@ func (col *DatabaseCollectionWithUser) authorizeDoc(doc *Document, revid string)

// Gets a revision of a document. If it's obsolete it will be loaded from the database if possible.
// inline "_attachments" properties in the body will be extracted and returned separately if present (pre-2.5 metadata, or backup revisions)
func (c *DatabaseCollection) getRevision(ctx context.Context, doc *Document, revid string) (bodyBytes []byte, body Body, attachments AttachmentsMeta, err error) {
func (c *DatabaseCollection) getRevision(ctx context.Context, doc *Document, revid string) (bodyBytes []byte, attachments AttachmentsMeta, err error) {
bodyBytes = doc.getRevisionBodyJSON(ctx, revid, c.RevisionBodyLoader)

// No inline body, so look for separate doc:
if bodyBytes == nil {
if !doc.History.contains(revid) {
return nil, nil, nil, ErrMissing
return nil, nil, ErrMissing
}

bodyBytes, err = c.getOldRevisionJSON(ctx, doc.ID, revid)
if err != nil || bodyBytes == nil {
return nil, nil, nil, err
return nil, nil, err
}
}

// optimistically grab the doc body and to store as a pre-unmarshalled version, as well as anticipating no inline attachments.
if doc.CurrentRev == revid {
body = doc._body
attachments = doc.Attachments
}

// handle backup revision inline attachments, or pre-2.5 meta
if inlineAtts, cleanBodyBytes, cleanBody, err := extractInlineAttachments(bodyBytes); err != nil {
return nil, nil, nil, err
if inlineAtts, cleanBodyBytes, _, err := extractInlineAttachments(bodyBytes); err != nil {
return nil, nil, err
} else if len(inlineAtts) > 0 {
// we found some inline attachments, so merge them with attachments, and update the bodies
attachments = mergeAttachments(inlineAtts, attachments)
bodyBytes = cleanBodyBytes
body = cleanBody
}

return bodyBytes, body, attachments, nil
return bodyBytes, attachments, nil
}

// mergeAttachments copies the docAttachments map, and merges pre25Attachments into it.
Expand Down Expand Up @@ -705,7 +703,7 @@ func (db *DatabaseCollectionWithUser) get1xRevFromDoc(ctx context.Context, doc *
return nil, false, ErrDeleted
}
}
if bodyBytes, _, attachments, err = db.getRevision(ctx, doc, revid); err != nil {
if bodyBytes, attachments, err = db.getRevision(ctx, doc, revid); err != nil {
return nil, false, err
}
}
Expand Down Expand Up @@ -742,7 +740,7 @@ func (db *DatabaseCollectionWithUser) get1xRevFromDoc(ctx context.Context, doc *
// Returns the body and rev ID of the asked-for revision or the most recent available ancestor.
func (db *DatabaseCollectionWithUser) getAvailableRev(ctx context.Context, doc *Document, revid string) ([]byte, string, AttachmentsMeta, error) {
for ; revid != ""; revid = doc.History[revid].Parent {
if bodyBytes, _, attachments, _ := db.getRevision(ctx, doc, revid); bodyBytes != nil {
if bodyBytes, attachments, _ := db.getRevision(ctx, doc, revid); bodyBytes != nil {
return bodyBytes, revid, attachments, nil
}
}
Expand Down Expand Up @@ -2155,15 +2153,14 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

revChannels := doc.History[newRevID].Channels
documentRevision := DocumentRevision{
DocID: docid,
RevID: newRevID,
BodyBytes: storedDocBytes,
History: encodeRevisions(ctx, docid, history),
Channels: revChannels,
Attachments: doc.Attachments,
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
_shallowCopyBody: storedDoc.Body(ctx),
DocID: docid,
RevID: newRevID,
BodyBytes: storedDocBytes,
History: encodeRevisions(ctx, docid, history),
Channels: revChannels,
Attachments: doc.Attachments,
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
}

if createNewRevIDSkipped {
Expand Down Expand Up @@ -2261,7 +2258,7 @@ func getAttachmentIDsForLeafRevisions(ctx context.Context, db *DatabaseCollectio
})

for _, leafRevision := range documentLeafRevisions {
_, _, attachmentMeta, err := db.getRevision(ctx, doc, leafRevision)
_, attachmentMeta, err := db.getRevision(ctx, doc, leafRevision)
if err != nil {
return nil, err
}
Expand Down
88 changes: 81 additions & 7 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,13 @@ func TestGetRemovedAsUser(t *testing.T) {
// Manually remove the temporary backup doc from the bucket
// Manually flush the rev cache
// After expiry from the rev cache and removal of doc backup, try again
cacheHitCounter, cacheMissCounter := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(DefaultRevisionCacheShardCount, DefaultRevisionCacheSize, backingStoreMap, cacheHitCounter, cacheMissCounter)
cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses, db.DatabaseContext.DbStats.Cache().RevisionCacheNumItems, db.DatabaseContext.DbStats.Cache().RevisionCacheTotalMemory
cacheOptions := &RevisionCacheOptions{
MaxBytes: 0,
MaxItemCount: DefaultRevisionCacheSize,
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
assert.NoError(t, err, "Purge old revision JSON")

Expand Down Expand Up @@ -514,9 +519,11 @@ func TestGetRemovalMultiChannel(t *testing.T) {
_, rev1Digest := ParseRevID(ctx, rev1ID)
_, rev2Digest := ParseRevID(ctx, rev2ID)

var interfaceListChannels []interface{}
interfaceListChannels = append(interfaceListChannels, "ABC")
bodyExpected := Body{
"k2": "v2",
"channels": []string{"ABC"},
"channels": interfaceListChannels,
BodyRevisions: Revisions{
RevisionsStart: 2,
RevisionsIds: []string{rev2Digest, rev1Digest},
Expand Down Expand Up @@ -752,8 +759,13 @@ func TestGetRemoved(t *testing.T) {
// Manually remove the temporary backup doc from the bucket
// Manually flush the rev cache
// After expiry from the rev cache and removal of doc backup, try again
cacheHitCounter, cacheMissCounter := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(DefaultRevisionCacheShardCount, DefaultRevisionCacheSize, backingStoreMap, cacheHitCounter, cacheMissCounter)
cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses, db.DatabaseContext.DbStats.Cache().RevisionCacheNumItems, db.DatabaseContext.DbStats.Cache().RevisionCacheTotalMemory
cacheOptions := &RevisionCacheOptions{
MaxBytes: 0,
MaxItemCount: DefaultRevisionCacheSize,
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
assert.NoError(t, err, "Purge old revision JSON")

Expand Down Expand Up @@ -821,8 +833,13 @@ func TestGetRemovedAndDeleted(t *testing.T) {
// Manually remove the temporary backup doc from the bucket
// Manually flush the rev cache
// After expiry from the rev cache and removal of doc backup, try again
cacheHitCounter, cacheMissCounter := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(DefaultRevisionCacheShardCount, DefaultRevisionCacheSize, backingStoreMap, cacheHitCounter, cacheMissCounter)
cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStats := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses, db.DatabaseContext.DbStats.Cache().RevisionCacheNumItems, db.DatabaseContext.DbStats.Cache().RevisionCacheTotalMemory
cacheOptions := &RevisionCacheOptions{
MaxBytes: 0,
MaxItemCount: DefaultRevisionCacheSize,
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStats)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
assert.NoError(t, err, "Purge old revision JSON")

Expand Down Expand Up @@ -3321,3 +3338,60 @@ func TestBadDCPStart(t *testing.T) {

dbCtx.Close(ctx)
}

func TestInject1xBodyProperties(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

rev1ID, _, err := collection.Put(ctx, "doc", Body{"test": "doc"})
require.NoError(t, err)
var rev2Body Body
rev2Data := `{"key":"value", "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}`
require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body))
_, rev2ID, err := collection.PutExistingRevWithBody(ctx, "doc", rev2Body, []string{"2-abc", rev1ID}, true)
require.NoError(t, err)

docRev, err := collection.GetRev(ctx, "doc", rev2ID, true, nil)
require.NoError(t, err)

// mock expiry on doc
exp := time.Now()
docRev.Expiry = &exp

newDoc, err := docRev.Inject1xBodyProperties(ctx, collection, docRev.History, nil, true)
require.NoError(t, err)
var resBody Body
require.NoError(t, resBody.Unmarshal(newDoc))

// cast to map of interface given we have injected the properties runtime has no concept of the AttachmentMeta and Revisions types
revs := resBody[BodyRevisions].(map[string]interface{})
atts := resBody[BodyAttachments].(map[string]interface{})

assert.NotNil(t, atts)
assert.NotNil(t, revs)
assert.Equal(t, "doc", resBody[BodyId])
assert.Equal(t, "2-abc", resBody[BodyRev])
assert.Equal(t, exp.Format(time.RFC3339), resBody[BodyExpiry])
assert.Equal(t, "value", resBody["key"])

// mock doc deleted
docRev.Deleted = true

newDoc, err = docRev.Inject1xBodyProperties(ctx, collection, docRev.History, []string{"2-abc"}, true)
require.NoError(t, err)
require.NoError(t, resBody.Unmarshal(newDoc))

// cast to map of interface given we have injected the properties runtime has no concept of the AttachmentMeta and Revisions types
revs = resBody[BodyRevisions].(map[string]interface{})
atts = resBody[BodyAttachments].(map[string]interface{})

assert.NotNil(t, atts)
assert.NotNil(t, revs)
assert.Equal(t, "doc", resBody[BodyId])
assert.Equal(t, "2-abc", resBody[BodyRev])
assert.Equal(t, exp.Format(time.RFC3339), resBody[BodyExpiry])
assert.Equal(t, "value", resBody["key"])
assert.True(t, resBody[BodyDeleted].(bool))
}
Loading
Loading