Skip to content

Commit

Permalink
refactor(api,ssh,pkg): abstract internal client's asynq.Client
Browse files Browse the repository at this point in the history
To support the new worker package, we are replacing the direct usage of
`asynq.Client` with the new `worker.Client` (#3979). The client
initialization process has also been updated to return an error if the
worker cannot be created.
  • Loading branch information
heiytor committed Jul 25, 2024
1 parent 09fd7b8 commit 5fad073
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 71 deletions.
2 changes: 1 addition & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/cnf/structhash v0.0.0-20201127153200-e1b16c1ebc08
github.com/getsentry/sentry-go v0.28.1
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/hibiken/asynq v0.24.1
github.com/labstack/echo/v4 v4.12.0
github.com/labstack/gommon v0.4.2
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -58,6 +57,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hibiken/asynq v0.24.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
Expand Down
9 changes: 6 additions & 3 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func startSentry(dsn string) (*sentry.Client, error) {
func startServer(ctx context.Context, cfg *config, store store.Store, cache storecache.Cache) error {
log.Info("Starting API server")

requestClient := requests.NewClient()
requestClient, err := requests.NewClient()
if err != nil {
log.WithError(err).
Fatal("failed to create the internalclient")
}

var locator geoip.Locator
if cfg.GeoIP {
Expand Down Expand Up @@ -210,8 +214,7 @@ func startServer(ctx context.Context, cfg *config, store store.Store, cache stor
router.Close()
}()

err := router.Start(":8080") //nolint:errcheck

err = router.Start(":8080") //nolint:errcheck
log.WithError(err).Info("HTTP server closed")

return nil
Expand Down
63 changes: 10 additions & 53 deletions pkg/api/internalclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,10 @@ import (
"net/http"

resty "github.com/go-resty/resty/v2"
"github.com/hibiken/asynq"
"github.com/shellhub-io/shellhub/pkg/worker"
"github.com/sirupsen/logrus"
)

type client struct {
http *resty.Client
logger *logrus.Logger
asynq *asynq.Client
}

type Client interface {
deviceAPI
namespaceAPI
Expand All @@ -26,8 +20,11 @@ type Client interface {
firewallAPI
}

// Ensures the client implements Client.
var _ Client = (*client)(nil)
type client struct {
http *resty.Client
logger *logrus.Logger
worker worker.Client
}

const (
DeviceUIDHeader = "X-Device-UID"
Expand All @@ -39,16 +36,7 @@ var (
ErrUnknown = errors.New("unknown error")
)

// Options wraps injectable values to a new API internal client.
// NOTE(r): This is a workaround to inject the Asynq client to the API internal client, because the [client] structure
// and its properties are privated.
type Options struct {
Asynq *asynq.Client
}

type Opt func(*Options) error

func NewClient(opts ...Opt) Client {
func NewClient(opts ...clientOption) (Client, error) {
httpClient := resty.New()
httpClient.SetBaseURL("http://api:8080")
httpClient.SetRetryCount(math.MaxInt32)
Expand All @@ -61,46 +49,15 @@ func NewClient(opts ...Opt) Client {
})

c := &client{http: httpClient}

o := new(Options)
for _, opt := range opts {
if err := opt(o); err != nil {
return nil
if err := opt(c); err != nil {
return nil, err
}
}

if o.Asynq != nil {
c.asynq = o.Asynq
}

if c.logger != nil {
httpClient.SetLogger(&LeveledLogger{c.logger})
}

return c
}

// NewClientWithAsynq creates a new API internal client with the Asynq client injected to turn the API internal client
// able to enqueue ping tasks to the Asynq server and late process by API server.
//
// It uses the [NewClient] function to create a new API internal client and injects the Asynq client to it through the
// [Options] structure.
func NewClientWithAsynq(uri string) Client {
return NewClient(func(o *Options) error {
// The internal client used by the SSH server needs to be able to enqueue tasks to the Asynq server, due that,
// we must set the Asynq client to the internal client as a configuration function.
options, err := asynq.ParseRedisURI(uri)
if err != nil {
return err
}

client := asynq.NewClient(options)
if client == nil {
return errors.New("failed to create Asynq client")
}

o.Asynq = client

return nil
})
return c, nil
}
8 changes: 4 additions & 4 deletions pkg/api/internalclient/device.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package internalclient

import (
"context"
"errors"
"fmt"
"net/http"

"github.com/go-resty/resty/v2"
"github.com/hibiken/asynq"
"github.com/shellhub-io/shellhub/pkg/clock"
"github.com/shellhub-io/shellhub/pkg/models"
"github.com/shellhub-io/shellhub/pkg/worker"
)

type deviceAPI interface {
Expand Down Expand Up @@ -45,10 +46,9 @@ func (c *client) DevicesOffline(uid string) error {
}

func (c *client) DevicesHeartbeat(tenant, uid string) error {
payload := []byte(fmt.Sprintf("%s:%s=%d", tenant, uid, clock.Now().Unix()))
_, err := c.asynq.Enqueue(asynq.NewTask("api:heartbeat", payload), asynq.Queue("api"), asynq.Group("api:heartbeat:batch"))
payload := fmt.Sprintf("%s:%s=%d", tenant, uid, clock.Now().Unix())

return err
return c.worker.SubmitToBatch(context.TODO(), worker.TaskPattern("api:heartbeat"), []byte(payload))
}

func (c *client) Lookup(lookup map[string]string) (string, []error) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/api/internalclient/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package internalclient

import "github.com/shellhub-io/shellhub/pkg/worker/asynq"

type clientOption func(c *client) error

func WithAsynqWorker(redisURI string) clientOption { //nolint:revive
return func(c *client) error {
asynqClient, err := asynq.NewClient(redisURI)
if err != nil {
return err
}

c.worker = asynqClient

return nil
}
}
1 change: 1 addition & 0 deletions ssh/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
)

require (
github.com/adhocore/gronx v1.8.1 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
Loading

0 comments on commit 5fad073

Please sign in to comment.