Skip to content

Commit a74a86f

Browse files
authored
agones-{extensions,allocator}: Make servers context aware (#3845)
* agones-{extensions,allocator}: Make servers context aware, add gRPC health check * adds an `httpserver` utility package to handle the `Run` function that controller/extensions use. Make that context aware using the same method as https.Run: https://github.com/googleforgames/agones/blob/dfa414e5e4da37798833bbf8c33919acb5f3c2ea/pkg/util/https/server.go#L127-L130 * also plumbs context-awareness through the allocator run{Mux,REST,GRPC} functions. * adds a gRPC health server to the allocator, calls .Shutdown() on it during graceful termination - this seems to push the client off correctly. Tested with e2e in a loop. Towards #3853 * Move from context.Background() * Use Shutdown/GracefulStop * Relax deadline slightly (original had none), also delete pod from GetAllocatorClient
1 parent d5027db commit a74a86f

File tree

14 files changed

+522
-149
lines changed

14 files changed

+522
-149
lines changed

cmd/allocator/main.go

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
"google.golang.org/grpc"
3636
"google.golang.org/grpc/codes"
3737
"google.golang.org/grpc/credentials"
38+
grpchealth "google.golang.org/grpc/health"
39+
"google.golang.org/grpc/health/grpc_health_v1"
3840
"google.golang.org/grpc/keepalive"
3941
"google.golang.org/grpc/status"
4042
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -52,6 +54,7 @@ import (
5254
"agones.dev/agones/pkg/gameserverallocations"
5355
"agones.dev/agones/pkg/gameservers"
5456
"agones.dev/agones/pkg/util/fswatch"
57+
"agones.dev/agones/pkg/util/httpserver"
5558
"agones.dev/agones/pkg/util/runtime"
5659
"agones.dev/agones/pkg/util/signals"
5760
)
@@ -218,19 +221,18 @@ func main() {
218221
health, closer := setupMetricsRecorder(conf)
219222
defer closer()
220223

221-
// http.DefaultServerMux is used for http connection, not for https
222-
http.Handle("/", health)
223-
224224
kubeClient, agonesClient, err := getClients(conf)
225225
if err != nil {
226226
logger.WithError(err).Fatal("could not create clients")
227227
}
228228

229+
listenCtx, cancelListenCtx := context.WithCancel(context.Background())
230+
229231
// This will test the connection to agones on each readiness probe
230232
// so if one of the allocator pod can't reach Kubernetes it will be removed
231233
// from the Kubernetes service.
232-
ctx, cancelCtx := context.WithCancel(context.Background())
233234
podReady = true
235+
grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w
234236
health.AddReadinessCheck("allocator-agones-client", func() error {
235237
if !podReady {
236238
return errors.New("asked to shut down, failed readiness check")
@@ -245,16 +247,15 @@ func main() {
245247
signals.NewSigTermHandler(func() {
246248
logger.Info("Pod shutdown has been requested, failing readiness check")
247249
podReady = false
250+
grpcHealth.Shutdown()
248251
time.Sleep(conf.ReadinessShutdownDuration)
249-
cancelCtx()
250-
logger.Infof("Readiness shutdown duration has passed, context cancelled")
251-
time.Sleep(1 * time.Second) // allow a brief time for cleanup, but force exit if main doesn't
252-
os.Exit(0)
252+
cancelListenCtx()
253253
})
254254

255255
grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode)
256256

257-
h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)
257+
workerCtx, cancelWorkerCtx := context.WithCancel(context.Background())
258+
h := newServiceHandler(workerCtx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)
258259

259260
if !h.tlsDisabled {
260261
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
@@ -294,51 +295,62 @@ func main() {
294295

295296
// If grpc and http use the same port then use a mux.
296297
if conf.GRPCPort == conf.HTTPPort {
297-
runMux(h, conf.HTTPPort)
298+
runMux(listenCtx, workerCtx, h, grpcHealth, conf.HTTPPort)
298299
} else {
299300
// Otherwise, run each on a dedicated port.
300301
if validPort(conf.HTTPPort) {
301-
runREST(h, conf.HTTPPort)
302+
runREST(listenCtx, workerCtx, h, conf.HTTPPort)
302303
}
303304
if validPort(conf.GRPCPort) {
304-
runGRPC(h, conf.GRPCPort)
305+
runGRPC(listenCtx, h, grpcHealth, conf.GRPCPort)
305306
}
306307
}
307308

308-
// Finally listen on 8080 (http) and block the main goroutine
309-
// this is used to serve /live and /ready handlers for Kubernetes probes.
310-
err = http.ListenAndServe(":8080", http.DefaultServeMux)
311-
logger.WithError(err).Fatal("allocation service crashed")
309+
// Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes.
310+
healthserver := httpserver.Server{Logger: logger}
311+
healthserver.Handle("/", health)
312+
go func() { _ = healthserver.Run(listenCtx, 0) }()
313+
314+
// TODO: This is messy. Contexts are the wrong way to handle this - we should be using shutdown,
315+
// and a cascading graceful shutdown instead of multiple contexts and sleeps.
316+
<-listenCtx.Done()
317+
logger.Infof("Listen context cancelled")
318+
time.Sleep(5 * time.Second)
319+
cancelWorkerCtx()
320+
logger.Infof("Worker context cancelled")
321+
time.Sleep(1 * time.Second)
322+
logger.Info("Shut down allocator")
312323
}
313324

314325
func validPort(port int) bool {
315326
const maxPort = 65535
316327
return port >= 0 && port < maxPort
317328
}
318329

319-
func runMux(h *serviceHandler, httpPort int) {
330+
func runMux(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) {
320331
logger.Infof("Running the mux handler on port %d", httpPort)
321332
grpcServer := grpc.NewServer(h.getMuxServerOptions()...)
322333
pb.RegisterAllocationServiceServer(grpcServer, h)
334+
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)
323335

324336
mux := runtime.NewServerMux()
325337
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
326338
panic(err)
327339
}
328340

329-
runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux))
341+
runHTTP(listenCtx, workerCtx, h, httpPort, grpcHandlerFunc(grpcServer, mux))
330342
}
331343

332-
func runREST(h *serviceHandler, httpPort int) {
344+
func runREST(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int) {
333345
logger.WithField("port", httpPort).Info("Running the rest handler")
334346
mux := runtime.NewServerMux()
335347
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
336348
panic(err)
337349
}
338-
runHTTP(h, httpPort, mux)
350+
runHTTP(listenCtx, workerCtx, h, httpPort, mux)
339351
}
340352

341-
func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
353+
func runHTTP(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int, handler http.Handler) {
342354
cfg := &tls.Config{}
343355
if !h.tlsDisabled {
344356
cfg.GetCertificate = h.getTLSCert
@@ -356,21 +368,29 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
356368
}
357369

358370
go func() {
371+
go func() {
372+
<-listenCtx.Done()
373+
_ = server.Shutdown(workerCtx)
374+
}()
375+
359376
var err error
360377
if !h.tlsDisabled {
361378
err = server.ListenAndServeTLS("", "")
362379
} else {
363380
err = server.ListenAndServe()
364381
}
365382

366-
if err != nil {
383+
if err == http.ErrServerClosed {
384+
logger.WithError(err).Info("HTTP/HTTPS server closed")
385+
os.Exit(0)
386+
} else {
367387
logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener")
368388
os.Exit(1)
369389
}
370390
}()
371391
}
372392

373-
func runGRPC(h *serviceHandler, grpcPort int) {
393+
func runGRPC(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, grpcPort int) {
374394
logger.WithField("port", grpcPort).Info("Running the grpc handler on port")
375395
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
376396
if err != nil {
@@ -380,11 +400,22 @@ func runGRPC(h *serviceHandler, grpcPort int) {
380400

381401
grpcServer := grpc.NewServer(h.getGRPCServerOptions()...)
382402
pb.RegisterAllocationServiceServer(grpcServer, h)
403+
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)
383404

384405
go func() {
406+
go func() {
407+
<-ctx.Done()
408+
grpcServer.GracefulStop()
409+
}()
410+
385411
err := grpcServer.Serve(listener)
386-
logger.WithError(err).Fatal("allocation service crashed")
387-
os.Exit(1)
412+
if err != nil {
413+
logger.WithError(err).Fatal("allocation service crashed")
414+
os.Exit(1)
415+
} else {
416+
logger.Info("allocation server closed")
417+
os.Exit(0)
418+
}
388419
}()
389420
}
390421

cmd/controller/main.go

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"encoding/json"
2121
"fmt"
2222
"io"
23-
"net/http"
2423
"os"
2524
"path/filepath"
2625
"strings"
@@ -54,6 +53,7 @@ import (
5453
"agones.dev/agones/pkg/gameservers"
5554
"agones.dev/agones/pkg/gameserversets"
5655
"agones.dev/agones/pkg/metrics"
56+
"agones.dev/agones/pkg/util/httpserver"
5757
"agones.dev/agones/pkg/util/runtime"
5858
"agones.dev/agones/pkg/util/signals"
5959
)
@@ -171,7 +171,7 @@ func main() {
171171
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
172172
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
173173

174-
server := &httpServer{}
174+
server := &httpserver.Server{Logger: logger}
175175
var rs []runner
176176
var health healthcheck.Handler
177177

@@ -547,10 +547,6 @@ type runner interface {
547547
Run(ctx context.Context, workers int) error
548548
}
549549

550-
type httpServer struct {
551-
http.ServeMux
552-
}
553-
554550
func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) {
555551
if !doLeaderElection {
556552
start(ctx)
@@ -600,22 +596,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E
600596
},
601597
})
602598
}
603-
604-
func (h *httpServer) Run(_ context.Context, _ int) error {
605-
logger.Info("Starting http server...")
606-
srv := &http.Server{
607-
Addr: ":8080",
608-
Handler: h,
609-
}
610-
defer srv.Close() // nolint: errcheck
611-
612-
if err := srv.ListenAndServe(); err != nil {
613-
if err == http.ErrServerClosed {
614-
logger.WithError(err).Info("http server closed")
615-
} else {
616-
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
617-
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
618-
}
619-
}
620-
return nil
621-
}

cmd/extensions/main.go

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package main
1818
import (
1919
"context"
2020
"io"
21-
"net/http"
2221
"os"
2322
"path/filepath"
2423
"strings"
@@ -46,6 +45,7 @@ import (
4645
"agones.dev/agones/pkg/metrics"
4746
"agones.dev/agones/pkg/util/apiserver"
4847
"agones.dev/agones/pkg/util/https"
48+
"agones.dev/agones/pkg/util/httpserver"
4949
"agones.dev/agones/pkg/util/runtime"
5050
"agones.dev/agones/pkg/util/signals"
5151
"agones.dev/agones/pkg/util/webhooks"
@@ -150,7 +150,7 @@ func main() {
150150
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
151151
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
152152

153-
server := &httpServer{}
153+
server := &httpserver.Server{Logger: logger}
154154
var health healthcheck.Handler
155155

156156
// Stackdriver metrics
@@ -340,26 +340,3 @@ type config struct {
340340
type runner interface {
341341
Run(ctx context.Context, workers int) error
342342
}
343-
344-
type httpServer struct {
345-
http.ServeMux
346-
}
347-
348-
func (h *httpServer) Run(_ context.Context, _ int) error {
349-
logger.Info("Starting http server...")
350-
srv := &http.Server{
351-
Addr: ":8080",
352-
Handler: h,
353-
}
354-
defer srv.Close() // nolint: errcheck
355-
356-
if err := srv.ListenAndServe(); err != nil {
357-
if err == http.ErrServerClosed {
358-
logger.WithError(err).Info("http server closed")
359-
} else {
360-
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
361-
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
362-
}
363-
}
364-
return nil
365-
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
contrib.go.opencensus.io/exporter/stackdriver v0.8.0
1010
fortio.org/fortio v1.3.1
1111
github.com/ahmetb/gen-crd-api-reference-docs v0.3.0
12+
github.com/evanphx/json-patch v4.12.0+incompatible
1213
github.com/fsnotify/fsnotify v1.6.0
1314
github.com/go-openapi/spec v0.19.5
1415
github.com/google/go-cmp v0.5.9
@@ -23,7 +24,6 @@ require (
2324
github.com/pkg/errors v0.9.1
2425
github.com/prometheus/client_golang v1.16.0
2526
github.com/sirupsen/logrus v1.9.0
26-
github.com/spf13/cast v1.3.0
2727
github.com/spf13/pflag v1.0.5
2828
github.com/spf13/viper v1.7.0
2929
github.com/stretchr/testify v1.8.2
@@ -60,7 +60,6 @@ require (
6060
github.com/cespare/xxhash/v2 v2.2.0 // indirect
6161
github.com/davecgh/go-spew v1.1.1 // indirect
6262
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
63-
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
6463
github.com/go-logr/logr v1.2.4 // indirect
6564
github.com/go-openapi/jsonpointer v0.19.6 // indirect
6665
github.com/go-openapi/jsonreference v0.20.2 // indirect
@@ -92,6 +91,7 @@ require (
9291
github.com/prometheus/procfs v0.10.1 // indirect
9392
github.com/russross/blackfriday/v2 v2.1.0 // indirect
9493
github.com/spf13/afero v1.9.2 // indirect
94+
github.com/spf13/cast v1.3.0 // indirect
9595
github.com/spf13/jwalterweatherman v1.0.0 // indirect
9696
github.com/subosito/gotenv v1.2.0 // indirect
9797
golang.org/x/crypto v0.21.0 // indirect

pkg/util/https/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const (
3333

3434
// tls is a http server interface to enable easier testing
3535
type tls interface {
36-
Close() error
36+
Shutdown(context.Context) error
3737
ListenAndServeTLS(certFile, keyFile string) error
3838
}
3939

@@ -126,7 +126,7 @@ func (s *Server) WatchForCertificateChanges() (func(), error) {
126126
func (s *Server) Run(ctx context.Context, _ int) error {
127127
go func() {
128128
<-ctx.Done()
129-
s.tls.Close() // nolint: errcheck,gosec
129+
_ = s.tls.Shutdown(context.Background())
130130
}()
131131

132132
s.logger.WithField("server", s).Infof("https server started")

pkg/util/https/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type testServer struct {
2727
server *httptest.Server
2828
}
2929

30-
func (ts *testServer) Close() error {
30+
func (ts *testServer) Shutdown(_ context.Context) error {
3131
ts.server.Close()
3232
return nil
3333
}

0 commit comments

Comments
 (0)