Skip to content

Commit 9326dbd

Browse files
authored
CBG-4174 Force disconnection of blip clients on database close (#7166)
1 parent a6de9a4 commit 9326dbd

File tree

9 files changed

+64
-15
lines changed

9 files changed

+64
-15
lines changed

db/active_replicator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
208208
arc.replicationStats.NumConnectAttempts.Add(1)
209209

210210
var originPatterns []string // no origin headers for ISGR
211-
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns)
211+
// NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently
212+
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil)
212213
if err != nil {
213214
return nil, nil, err
214215
}

db/active_replicator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestBlipSyncErrorUserinfo(t *testing.T) {
6565
srvURL.Path = "/db1"
6666
t.Logf("srvURL: %v", srvURL.String())
6767

68-
blipContext, err := NewSGBlipContext(base.TestCtx(t), t.Name(), nil)
68+
blipContext, err := NewSGBlipContext(base.TestCtx(t), t.Name(), nil, nil)
6969
require.NoError(t, err)
7070

7171
_, err = blipSync(*srvURL, blipContext, false)

db/blip.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,15 @@ var (
3636
)
3737

3838
// NewSGBlipContext returns a go-blip context with the given ID, initialized for use in Sync Gateway.
39-
func NewSGBlipContext(ctx context.Context, id string, origin []string) (bc *blip.Context, err error) {
40-
return NewSGBlipContextWithProtocols(ctx, id, origin, supportedSubprotocols())
39+
func NewSGBlipContext(ctx context.Context, id string, origin []string, cancelCtx context.Context) (bc *blip.Context, err error) {
40+
return NewSGBlipContextWithProtocols(ctx, id, origin, supportedSubprotocols(), cancelCtx)
4141
}
4242

43-
func NewSGBlipContextWithProtocols(ctx context.Context, id string, origin []string, protocols []string) (bc *blip.Context, err error) {
43+
func NewSGBlipContextWithProtocols(ctx context.Context, id string, origin []string, protocols []string, cancelCtx context.Context) (bc *blip.Context, err error) {
4444
opts := blip.ContextOptions{
4545
Origin: origin,
4646
ProtocolIds: protocols,
47+
CancelCtx: cancelCtx,
4748
}
4849
if id == "" {
4950
bc, err = blip.NewContext(opts)

db/database.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,11 @@ type DatabaseContext struct {
115115
LocalJWTProviders auth.LocalJWTProviderMap
116116
ServerUUID string // UUID of the server, if available
117117

118-
DbStats *base.DbStats // stats that correspond to this database context
119-
CompactState uint32 // Status of database compaction
120-
terminator chan bool // Signal termination of background goroutines
121-
118+
DbStats *base.DbStats // stats that correspond to this database context
119+
CompactState uint32 // Status of database compaction
120+
terminator chan bool // Signal termination of background goroutines
121+
CancelContext context.Context // Cancelled when the database is closed - used to notify associated processes (e.g. blipContext)
122+
cancelContextFunc context.CancelFunc // Cancel function for cancelContext
122123
backgroundTasks []BackgroundTask // List of background tasks that are initiated.
123124
activeChannels *channels.ActiveChannels // Tracks active replications by channel
124125
CfgSG cbgt.Cfg // Sync Gateway cluster shared config
@@ -417,6 +418,14 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
417418
UserFunctionTimeout: defaultUserFunctionTimeout,
418419
}
419420

421+
// set up cancellable context based on the background context (context lifecycle for the database
422+
// must be distinct from the request context associated with the db create/update). Used to trigger
423+
// teardown of connected replications on database close.
424+
dbContext.CancelContext, dbContext.cancelContextFunc = context.WithCancel(context.Background())
425+
cleanupFunctions = append(cleanupFunctions, func() {
426+
dbContext.cancelContextFunc()
427+
})
428+
420429
// Check if server version supports multi-xattr operations, required for mou handling
421430
dbContext.EnableMou = bucket.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations)
422431

@@ -591,6 +600,9 @@ func (context *DatabaseContext) Close(ctx context.Context) {
591600

592601
context.OIDCProviders.Stop()
593602
close(context.terminator)
603+
if context.cancelContextFunc != nil {
604+
context.cancelContextFunc()
605+
}
594606

595607
// Stop All background processors
596608
bgManagers := context.stopBackgroundManagers()

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/coreos/go-oidc v2.2.1+incompatible
99
github.com/couchbase/cbgt v1.3.9
1010
github.com/couchbase/clog v0.1.0
11-
github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3
11+
github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949
1212
github.com/couchbase/gocb/v2 v2.9.1
1313
github.com/couchbase/gocbcore/v10 v10.5.1
1414
github.com/couchbase/gomemcached v0.2.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE=
4444
github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
4545
github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY=
4646
github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4=
47-
github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3 h1:MeikDkvUMHZLpS57pfzhu2E+disqUVulUTb/r3aqUck=
48-
github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3/go.mod h1:Dz8Keu17/4cjF7hvKYqOjH4pRXOh1CCnzsKlBOJaoJE=
47+
github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949 h1:jwFj/GtyaoACmwnGfan/XW38TBTG1kYboXLZfAqd2VE=
48+
github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949/go.mod h1:Dz8Keu17/4cjF7hvKYqOjH4pRXOh1CCnzsKlBOJaoJE=
4949
github.com/couchbase/go-couchbase v0.1.1 h1:ClFXELcKj/ojyoTYbsY34QUrrYCBi/1G749sXSCkdhk=
5050
github.com/couchbase/go-couchbase v0.1.1/go.mod h1:+/bddYDxXsf9qt0xpDUtRR47A2GjaXmGGAqQ/k3GJ8A=
5151
github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ=

rest/blip_api_crud_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3167,3 +3167,37 @@ func TestOnDemandImportBlipFailure(t *testing.T) {
31673167
}
31683168
})
31693169
}
3170+
3171+
// TestBlipDatabaseClose verifies that the client connection is closed when the database is closed.
3172+
// Starts a continuous pull replication then updates the db to trigger a close.
3173+
func TestBlipDatabaseClose(t *testing.T) {
3174+
3175+
base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache)
3176+
btcRunner := NewBlipTesterClientRunner(t)
3177+
btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
3178+
rt := NewRestTesterPersistentConfig(t)
3179+
defer rt.Close()
3180+
const username = "alice"
3181+
rt.CreateUser(username, []string{"*"})
3182+
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: username})
3183+
var blipContextClosed atomic.Bool
3184+
btcRunner.clients[btc.id].pullReplication.bt.blipContext.OnExitCallback = func() {
3185+
log.Printf("on exit callback invoked")
3186+
blipContextClosed.Store(true)
3187+
}
3188+
3189+
// put a doc, and make sure blip connection is established
3190+
markerDoc := "markerDoc"
3191+
markerDocVersion := rt.CreateTestDoc(markerDoc)
3192+
require.NoError(t, rt.WaitForPendingChanges())
3193+
require.NoError(t, btcRunner.StartPull(btc.id))
3194+
3195+
btcRunner.WaitForVersion(btc.id, markerDoc, markerDocVersion)
3196+
3197+
RequireStatus(t, rt.SendAdminRequest(http.MethodDelete, "/{{.db}}/", ""), http.StatusOK)
3198+
3199+
require.EventuallyWithT(t, func(c *assert.CollectT) {
3200+
assert.True(c, blipContextClosed.Load())
3201+
}, time.Second*10, time.Millisecond*100)
3202+
})
3203+
}

rest/blip_sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (h *handler) handleBLIPSync() error {
5252
originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin)
5353

5454
// Create a BLIP context:
55-
blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns)
55+
blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, h.db.DatabaseContext.CancelContext)
5656
if err != nil {
5757
return err
5858
}

rest/utilities_testing.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,8 +1522,9 @@ func createBlipTesterWithSpec(tb testing.TB, spec BlipTesterSpec, rt *RestTester
15221522
if err != nil {
15231523
return nil, err
15241524
}
1525-
// Make BLIP/Websocket connection
1526-
bt.blipContext, err = db.NewSGBlipContextWithProtocols(base.TestCtx(tb), "", origin, protocols)
1525+
// Make BLIP/Websocket connection. Not specifying cancellation context here as this is a
1526+
// client blip context that doesn't require cancellation-based close
1527+
bt.blipContext, err = db.NewSGBlipContextWithProtocols(base.TestCtx(tb), "", origin, protocols, nil)
15271528
if err != nil {
15281529
return nil, err
15291530
}

0 commit comments

Comments
 (0)