Skip to content

Commit

Permalink
Ensure all goroutines created by StartEtcd to complete before closing…
Browse files Browse the repository at this point in the history
… the errc

Also added commment to clarify the etcd shutting down workflow

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 22, 2025
1 parent c9045d6 commit d12c2cc
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 48 deletions.
104 changes: 72 additions & 32 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,24 @@ type Etcd struct {

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
cfg Config

// closeOnce is to ensure `stopc` is closed only once, no matter
// how many times the Close() method is called.
closeOnce sync.Once
wg sync.WaitGroup
// stopc is used to notify the sub goroutines not to send
// any errors to `errc`.
stopc chan struct{}
// errc is used to receive error from sub goroutines (including
// client handler, peer handler and metrics handler). It's closed
// after all these sub goroutines exit (checked via `wg`). Writers
// should avoid writing after `stopc` is closed by selecting on
// reading from `stopc`.
errc chan error

// wg is used to track the lifecycle of all sub goroutines which
// need to send error back to the `errc`.
wg sync.WaitGroup
}

type peerListener struct {
Expand Down Expand Up @@ -388,6 +400,24 @@ func (e *Etcd) Config() Config {
// Close gracefully shuts down all servers/listeners.
// Client requests will be terminated with request timeout.
// After timeout, enforce remaning requests be closed immediately.
//
// The rough workflow to shut down etcd:
// 1. close the `stopc` channel, so that all error handlers (child
// goroutines) won't send back any errors anymore;
// 2. stop the http and grpc servers gracefully, within request timeout;
// 3. close all client and metrics listeners, so that etcd server
// stops receiving any new connection;
// 4. call the cancel function to close the gateway context, so that
// all gateway connections are closed.
// 5. stop etcd server gracefully, and ensure the main raft loop
// goroutine is stopped;
// 6. stop all peer listeners, so that it stops receiving peer connections
// and messages (wait up to 1-second);
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
// and metrics handlers) to exit;
// 8. close the `errc` channel to release the resource. Note that it's only
// safe to close the `errc` after step 7 above is done, otherwise the
// child goroutines may send errors back to already closed `errc` channel.
func (e *Etcd) Close() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
Expand Down Expand Up @@ -607,14 +637,15 @@ func (e *Etcd) servePeers() {

// start peer servers in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
l := pl
e.errHandler(func() error {
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
return l.serve()
})
}
}

Expand Down Expand Up @@ -772,14 +803,36 @@ func (e *Etcd) serveClients() {
}
}

// start client servers in each goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...))
}(sctx)
s := sctx
e.errHandler(func() error {
return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)
})
}
}

func (e *Etcd) errHandler(handler func() error) {
// start each handler in a separate goroutine
e.wg.Add(1)
go func() {
defer e.wg.Done()

err := handler()
if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
}
select {
case <-e.stopc:
return
default:

Check warning on line 827 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L827

Added line #L827 was not covered by tests
}
select {
case <-e.stopc:
case e.errc <- err:

Check warning on line 831 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L829-L831

Added lines #L829 - L831 were not covered by tests
}
}()
}

func (e *Etcd) grpcGatewayDial(splitHTTP bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
if !e.cfg.EnableGRPCGateway {
return nil
Expand Down Expand Up @@ -859,36 +912,23 @@ func (e *Etcd) serveMetrics() (err error) {
return err
}
e.metricsListeners = append(e.metricsListeners, ml)
go func(u url.URL, ln net.Listener) {

var (
u = murl
ln = ml
)
e.errHandler(func() error {

Check warning on line 920 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L916-L920

Added lines #L916 - L920 were not covered by tests
e.cfg.logger.Info(
"serving metrics",
zap.String("address", u.String()),
)
e.errHandler(http.Serve(ln, metricsMux))
}(murl, ml)
return http.Serve(ln, metricsMux)
})

Check warning on line 926 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L925-L926

Added lines #L925 - L926 were not covered by tests
}
}
return nil
}

func (e *Etcd) errHandler(err error) {
e.wg.Add(1)
defer e.wg.Done()

if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
}
select {
case <-e.stopc:
return
default:
}
select {
case <-e.stopc:
case e.errc <- err:
}
}

// GetLogger returns the logger.
func (e *Etcd) GetLogger() *zap.Logger {
e.cfg.loggerMu.RLock()
Expand Down
54 changes: 38 additions & 16 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,20 @@ type serveCtx struct {
insecure bool
httpOnly bool

// ctx is used to control the grpc gateway. Terminate the grpc gateway
// by calling `cancel` when shutting down the etcd.
ctx context.Context
cancel context.CancelFunc

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once

// serversC is used to receive the http and grpc server objects (created
// in `serve`), both of which will be closed when shutting down the etcd.
// Close it when `serve` returns or when etcd fails to bootstrap.
serversC chan *servers
// closeOnce is to ensure `serversC` is closed only once.
closeOnce sync.Once
}

type servers struct {
Expand Down Expand Up @@ -97,7 +104,7 @@ func (sctx *serveCtx) serve(
s *etcdserver.EtcdServer,
tlsinfo *transport.TLSInfo,
handler http.Handler,
errHandler func(error),
errHandler func(func() error),
grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error),
splitHTTP bool,
gopts ...grpc.ServerOption,
Expand Down Expand Up @@ -182,15 +189,25 @@ func (sctx *serveCtx) serve(
server = m.Serve

httpl := m.Match(cmux.HTTP1())
go func(srvhttp *http.Server, tlsLis net.Listener) {
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)

var (
srvhttp = srv
tlsLis = httpl
)
errHandler(func() error {
return srvhttp.Serve(tlsLis)
})

if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
go func(gs *grpc.Server, l net.Listener) {
errHandler(gs.Serve(l))
}(gs, grpcl)

var (
gs = gs
l = grpcl
)
errHandler(func() error {
return gs.Serve(l)
})
}
}

Expand Down Expand Up @@ -237,7 +254,7 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := configureHTTPServer(srv, s.Cfg); err != nil {
if err = configureHTTPServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
Expand All @@ -248,13 +265,18 @@ func (sctx *serveCtx) serve(
} else {
server = m.Serve

tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if tlsErr != nil {
return tlsErr

Check warning on line 270 in server/embed/serve.go

View check run for this annotation

Codecov / codecov/patch

server/embed/serve.go#L270

Added line #L270 was not covered by tests
}
go func(srvhttp *http.Server, tlsl net.Listener) {
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)

var (
srvhttp = srv
l = tlsl
)
errHandler(func() error {
return srvhttp.Serve(l)
})
}

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand Down

0 comments on commit d12c2cc

Please sign in to comment.