diff --git a/server/embed/etcd.go b/server/embed/etcd.go index fb610e0f42d..18811624854 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -94,7 +94,8 @@ type Etcd struct { // reading from `stopc`. errc chan error - // wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`. + // wg is used to track the lifecycle of all sub goroutines which + // need to send error back to the `errc`. wg sync.WaitGroup } @@ -636,16 +637,15 @@ func (e *Etcd) servePeers() { // start peer servers in a goroutine for _, pl := range e.Peers { - e.wg.Add(1) - go func(l *peerListener) { - defer e.wg.Done() + l := pl + e.startHandler(func() error { u := l.Addr().String() e.cfg.logger.Info( "serving peer traffic", zap.String("address", u), ) - e.errHandler(l.serve()) - }(pl) + return l.serve() + }) } } @@ -805,11 +805,10 @@ func (e *Etcd) serveClients() { // start client servers in each goroutine for _, sctx := range e.sctxs { - e.wg.Add(1) - go func(s *serveCtx) { - defer e.wg.Done() - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)) - }(sctx) + s := sctx + e.startHandler(func() error { + return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...) + }) } } @@ -887,25 +886,34 @@ func (e *Etcd) serveMetrics() (err error) { etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server) for _, murl := range e.cfg.ListenMetricsUrls { + u := murl ml, err := e.createMetricsListener(murl) if err != nil { return err } e.metricsListeners = append(e.metricsListeners, ml) - e.wg.Add(1) - go func(u url.URL, ln net.Listener) { - defer e.wg.Done() + + e.startHandler(func() error { e.cfg.logger.Info( "serving metrics", zap.String("address", u.String()), ) - e.errHandler(http.Serve(ln, metricsMux)) - }(murl, ml) + return http.Serve(ml, metricsMux) + }) } } return nil } +func (e *Etcd) startHandler(handler func() error) { + // start each handler in a separate goroutine + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.errHandler(handler()) + }() +} + func (e *Etcd) errHandler(err error) { if err != nil { e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err)) diff --git a/server/embed/serve.go b/server/embed/serve.go index ad7307666b0..e50529dfe43 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -80,6 +80,18 @@ type serveCtx struct { wg sync.WaitGroup } +func (sctx *serveCtx) startHandler(errHandler func(error), handler func() error) { + // start each handler in a separate goroutine + sctx.wg.Add(1) + go func() { + defer sctx.wg.Done() + err := handler() + if errHandler != nil { + errHandler(err) + } + }() +} + type servers struct { secure bool grpc *grpc.Server @@ -192,19 +204,15 @@ func (sctx *serveCtx) serve( server = m.Serve httpl := m.Match(cmux.HTTP1()) - sctx.wg.Add(1) - go func(srvhttp *http.Server, tlsLis net.Listener) { - defer sctx.wg.Done() - errHandler(srvhttp.Serve(tlsLis)) - }(srv, httpl) + sctx.startHandler(errHandler, func() error { + return srv.Serve(httpl) + }) if grpcEnabled { grpcl := m.Match(cmux.HTTP2()) - sctx.wg.Add(1) - go func(gs *grpc.Server, l net.Listener) { - defer sctx.wg.Done() - errHandler(gs.Serve(l)) - }(gs, grpcl) + sctx.startHandler(errHandler, func() error { + return gs.Serve(grpcl) + }) } } @@ -266,11 +274,9 @@ func (sctx *serveCtx) serve( if tlsErr != nil { return tlsErr } - sctx.wg.Add(1) - go func(srvhttp *http.Server, tlsl net.Listener) { - defer sctx.wg.Done() - errHandler(srvhttp.Serve(tlsl)) - }(srv, tlsl) + sctx.startHandler(errHandler, func() error { + return srv.Serve(tlsl) + }) } sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} @@ -283,7 +289,6 @@ func (sctx *serveCtx) serve( err = server() sctx.close() - // ensure all goroutines, which are created by this method, to complete before this method returns. sctx.wg.Wait() return err } @@ -354,9 +359,7 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie return nil, err } } - sctx.wg.Add(1) - go func() { - defer sctx.wg.Done() + sctx.startHandler(nil, func() error { <-ctx.Done() if cerr := conn.Close(); cerr != nil { sctx.lg.Warn( @@ -365,7 +368,8 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie zap.Error(cerr), ) } - }() + return nil + }) return gwmux, nil }