Skip to content

Commit

Permalink
refactor the error handler
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 22, 2025
1 parent 363b166 commit f330406
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 36 deletions.
40 changes: 24 additions & 16 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ type Etcd struct {
// reading from `stopc`.
errc chan error

// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
// wg is used to track the lifecycle of all sub goroutines which
// need to send error back to the `errc`.
wg sync.WaitGroup
}

Expand Down Expand Up @@ -636,16 +637,15 @@ func (e *Etcd) servePeers() {

// start peer servers in a goroutine
for _, pl := range e.Peers {
e.wg.Add(1)
go func(l *peerListener) {
defer e.wg.Done()
l := pl
e.startHandler(func() error {
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
return l.serve()
})
}
}

Expand Down Expand Up @@ -805,11 +805,10 @@ func (e *Etcd) serveClients() {

// start client servers in each goroutine
for _, sctx := range e.sctxs {
e.wg.Add(1)
go func(s *serveCtx) {
defer e.wg.Done()
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...))
}(sctx)
s := sctx
e.startHandler(func() error {
return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)
})
}
}

Expand Down Expand Up @@ -887,25 +886,34 @@ func (e *Etcd) serveMetrics() (err error) {
etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)

for _, murl := range e.cfg.ListenMetricsUrls {
u := murl

Check warning on line 889 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L889

Added line #L889 was not covered by tests
ml, err := e.createMetricsListener(murl)
if err != nil {
return err
}
e.metricsListeners = append(e.metricsListeners, ml)
e.wg.Add(1)
go func(u url.URL, ln net.Listener) {
defer e.wg.Done()

e.startHandler(func() error {

Check warning on line 896 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L896

Added line #L896 was not covered by tests
e.cfg.logger.Info(
"serving metrics",
zap.String("address", u.String()),
)
e.errHandler(http.Serve(ln, metricsMux))
}(murl, ml)
return http.Serve(ml, metricsMux)
})

Check warning on line 902 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L901-L902

Added lines #L901 - L902 were not covered by tests
}
}
return nil
}

func (e *Etcd) startHandler(handler func() error) {
// start each handler in a separate goroutine
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.errHandler(handler())
}()
}

func (e *Etcd) errHandler(err error) {
if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
Expand Down
44 changes: 24 additions & 20 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ type serveCtx struct {
wg sync.WaitGroup
}

func (sctx *serveCtx) startHandler(errHandler func(error), handler func() error) {
// start each handler in a separate goroutine
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
err := handler()
if errHandler != nil {
errHandler(err)
}
}()
}

type servers struct {
secure bool
grpc *grpc.Server
Expand Down Expand Up @@ -192,19 +204,15 @@ func (sctx *serveCtx) serve(
server = m.Serve

httpl := m.Match(cmux.HTTP1())
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsLis net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)
sctx.startHandler(errHandler, func() error {
return srv.Serve(httpl)
})

if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
sctx.wg.Add(1)
go func(gs *grpc.Server, l net.Listener) {
defer sctx.wg.Done()
errHandler(gs.Serve(l))
}(gs, grpcl)
sctx.startHandler(errHandler, func() error {
return gs.Serve(grpcl)
})
}
}

Expand Down Expand Up @@ -266,11 +274,9 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsl net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
sctx.startHandler(errHandler, func() error {
return srv.Serve(tlsl)
})
}

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -283,7 +289,6 @@ func (sctx *serveCtx) serve(

err = server()
sctx.close()
// ensure all goroutines, which are created by this method, to complete before this method returns.
sctx.wg.Wait()
return err
}
Expand Down Expand Up @@ -354,9 +359,7 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
return nil, err
}
}
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
sctx.startHandler(nil, func() error {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
sctx.lg.Warn(
Expand All @@ -365,7 +368,8 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
zap.Error(cerr),
)
}
}()
return nil
})

return gwmux, nil
}
Expand Down

0 comments on commit f330406

Please sign in to comment.