From d12c2cc6cce302bb7691d15e40f5ea0a65595159 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 17 Jan 2025 19:16:12 +0000 Subject: [PATCH] Ensure all goroutines created by StartEtcd to complete before closing the errc Also added commment to clarify the etcd shutting down workflow Signed-off-by: Benjamin Wang --- server/embed/etcd.go | 104 +++++++++++++++++++++++++++++------------- server/embed/serve.go | 54 +++++++++++++++------- 2 files changed, 110 insertions(+), 48 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index f5fa2c2c692..6cb7ac5015f 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -79,12 +79,24 @@ type Etcd struct { Server *etcdserver.EtcdServer - cfg Config - stopc chan struct{} - errc chan error + cfg Config + // closeOnce is to ensure `stopc` is closed only once, no matter + // how many times the Close() method is called. closeOnce sync.Once - wg sync.WaitGroup + // stopc is used to notify the sub goroutines not to send + // any errors to `errc`. + stopc chan struct{} + // errc is used to receive error from sub goroutines (including + // client handler, peer handler and metrics handler). It's closed + // after all these sub goroutines exit (checked via `wg`). Writers + // should avoid writing after `stopc` is closed by selecting on + // reading from `stopc`. + errc chan error + + // wg is used to track the lifecycle of all sub goroutines which + // need to send error back to the `errc`. + wg sync.WaitGroup } type peerListener struct { @@ -388,6 +400,24 @@ func (e *Etcd) Config() Config { // Close gracefully shuts down all servers/listeners. // Client requests will be terminated with request timeout. // After timeout, enforce remaning requests be closed immediately. +// +// The rough workflow to shut down etcd: +// 1. close the `stopc` channel, so that all error handlers (child +// goroutines) won't send back any errors anymore; +// 2. stop the http and grpc servers gracefully, within request timeout; +// 3. close all client and metrics listeners, so that etcd server +// stops receiving any new connection; +// 4. call the cancel function to close the gateway context, so that +// all gateway connections are closed. +// 5. stop etcd server gracefully, and ensure the main raft loop +// goroutine is stopped; +// 6. stop all peer listeners, so that it stops receiving peer connections +// and messages (wait up to 1-second); +// 7. wait for all child goroutines (i.e. client handlers, peer handlers +// and metrics handlers) to exit; +// 8. close the `errc` channel to release the resource. Note that it's only +// safe to close the `errc` after step 7 above is done, otherwise the +// child goroutines may send errors back to already closed `errc` channel. func (e *Etcd) Close() { fields := []zap.Field{ zap.String("name", e.cfg.Name), @@ -607,14 +637,15 @@ func (e *Etcd) servePeers() { // start peer servers in a goroutine for _, pl := range e.Peers { - go func(l *peerListener) { + l := pl + e.errHandler(func() error { u := l.Addr().String() e.cfg.logger.Info( "serving peer traffic", zap.String("address", u), ) - e.errHandler(l.serve()) - }(pl) + return l.serve() + }) } } @@ -772,14 +803,36 @@ func (e *Etcd) serveClients() { } } - // start client servers in each goroutine for _, sctx := range e.sctxs { - go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)) - }(sctx) + s := sctx + e.errHandler(func() error { + return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...) + }) } } +func (e *Etcd) errHandler(handler func() error) { + // start each handler in a separate goroutine + e.wg.Add(1) + go func() { + defer e.wg.Done() + + err := handler() + if err != nil { + e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err)) + } + select { + case <-e.stopc: + return + default: + } + select { + case <-e.stopc: + case e.errc <- err: + } + }() +} + func (e *Etcd) grpcGatewayDial(splitHTTP bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { if !e.cfg.EnableGRPCGateway { return nil @@ -859,36 +912,23 @@ func (e *Etcd) serveMetrics() (err error) { return err } e.metricsListeners = append(e.metricsListeners, ml) - go func(u url.URL, ln net.Listener) { + + var ( + u = murl + ln = ml + ) + e.errHandler(func() error { e.cfg.logger.Info( "serving metrics", zap.String("address", u.String()), ) - e.errHandler(http.Serve(ln, metricsMux)) - }(murl, ml) + return http.Serve(ln, metricsMux) + }) } } return nil } -func (e *Etcd) errHandler(err error) { - e.wg.Add(1) - defer e.wg.Done() - - if err != nil { - e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err)) - } - select { - case <-e.stopc: - return - default: - } - select { - case <-e.stopc: - case e.errc <- err: - } -} - // GetLogger returns the logger. func (e *Etcd) GetLogger() *zap.Logger { e.cfg.loggerMu.RLock() diff --git a/server/embed/serve.go b/server/embed/serve.go index c888e3b1cfb..80a5b4f28ee 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -61,13 +61,20 @@ type serveCtx struct { insecure bool httpOnly bool + // ctx is used to control the grpc gateway. Terminate the grpc gateway + // by calling `cancel` when shutting down the etcd. ctx context.Context cancel context.CancelFunc userHandlers map[string]http.Handler serviceRegister func(*grpc.Server) - serversC chan *servers - closeOnce sync.Once + + // serversC is used to receive the http and grpc server objects (created + // in `serve`), both of which will be closed when shutting down the etcd. + // Close it when `serve` returns or when etcd fails to bootstrap. + serversC chan *servers + // closeOnce is to ensure `serversC` is closed only once. + closeOnce sync.Once } type servers struct { @@ -97,7 +104,7 @@ func (sctx *serveCtx) serve( s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler http.Handler, - errHandler func(error), + errHandler func(func() error), grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), splitHTTP bool, gopts ...grpc.ServerOption, @@ -182,15 +189,25 @@ func (sctx *serveCtx) serve( server = m.Serve httpl := m.Match(cmux.HTTP1()) - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, httpl) + + var ( + srvhttp = srv + tlsLis = httpl + ) + errHandler(func() error { + return srvhttp.Serve(tlsLis) + }) if grpcEnabled { grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, l net.Listener) { - errHandler(gs.Serve(l)) - }(gs, grpcl) + + var ( + gs = gs + l = grpcl + ) + errHandler(func() error { + return gs.Serve(l) + }) } } @@ -237,7 +254,7 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } - if err := configureHTTPServer(srv, s.Cfg); err != nil { + if err = configureHTTPServer(srv, s.Cfg); err != nil { sctx.lg.Error("Configure https server failed", zap.Error(err)) return err } @@ -248,13 +265,18 @@ func (sctx *serveCtx) serve( } else { server = m.Serve - tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) - if err != nil { - return err + tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if tlsErr != nil { + return tlsErr } - go func(srvhttp *http.Server, tlsl net.Listener) { - errHandler(srvhttp.Serve(tlsl)) - }(srv, tlsl) + + var ( + srvhttp = srv + l = tlsl + ) + errHandler(func() error { + return srvhttp.Serve(l) + }) } sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}