diff --git a/api/go.mod b/api/go.mod index 228d6a1dac9..ab11595428d 100644 --- a/api/go.mod +++ b/api/go.mod @@ -28,6 +28,7 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/hcsshim v0.12.2 // indirect + github.com/adhocore/gronx v1.8.1 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/api/go.sum b/api/go.sum index 90f55b8a0ee..ce5d8380869 100644 --- a/api/go.sum +++ b/api/go.sum @@ -8,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/hcsshim v0.12.2 h1:AcXy+yfRvrx20g9v7qYaJv5Rh+8GaHOS6b8G6Wx/nKs= github.com/Microsoft/hcsshim v0.12.2/go.mod h1:RZV12pcHCXQ42XnlQ3pz6FZfmrC1C+R4gaOHhRNML1g= +github.com/adhocore/gronx v1.8.1 h1:F2mLTG5sB11z7vplwD4iydz3YCEjstSfYmCrdSm3t6A= +github.com/adhocore/gronx v1.8.1/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= @@ -267,6 +269,8 @@ github.com/testcontainers/testcontainers-go v0.32.0 h1:ug1aK08L3gCHdhknlTTwWjPHP github.com/testcontainers/testcontainers-go v0.32.0/go.mod h1:CRHrzHLQhlXUsa5gXjTOfqIEJcrK5+xMDmBr/WMI88E= github.com/testcontainers/testcontainers-go/modules/mongodb v0.32.0 h1:DvmvHV1irfNIVBhixeTAcoaWCvmdkoNQxRmZisqic4E= github.com/testcontainers/testcontainers-go/modules/mongodb v0.32.0/go.mod h1:z0ZvM2V2iThZGrzEN6sddJpvnGhJd6O1O0FTFoZXmpk= +github.com/testcontainers/testcontainers-go/modules/redis v0.32.0 h1:HW5Qo9qfLi5iwfS7cbXwG6qe8ybXGePcgGPEmVlVDlo= +github.com/testcontainers/testcontainers-go/modules/redis v0.32.0/go.mod h1:5kltdxVKZG0aP1iegeqKz4K8HHyP0wbkW5o84qLyMjY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= diff --git a/api/server.go b/api/server.go index 2e5feb9016d..2fa85fbe478 100644 --- a/api/server.go +++ b/api/server.go @@ -13,10 +13,11 @@ import ( "github.com/shellhub-io/shellhub/api/store" "github.com/shellhub-io/shellhub/api/store/mongo" "github.com/shellhub-io/shellhub/api/store/mongo/options" - "github.com/shellhub-io/shellhub/api/workers" requests "github.com/shellhub-io/shellhub/pkg/api/internalclient" storecache "github.com/shellhub-io/shellhub/pkg/cache" "github.com/shellhub-io/shellhub/pkg/geoip" + workerlib "github.com/shellhub-io/shellhub/pkg/worker" + "github.com/shellhub-io/shellhub/pkg/worker/asynq" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -61,13 +62,6 @@ var serverCmd = &cobra.Command{ log.Info("Connected to MongoDB") - worker, err := workers.New(store) - if err != nil { - log.WithError(err).Warn("Failed to create workers.") - } - - worker.Start(ctx) - go func() { sig := <-sigs @@ -99,9 +93,28 @@ type config struct { GeoIP bool `env:"GEOIP,default=false"` GeoIPMaxMindLicense string `env:"MAXMIND_LICENSE,default="` // Session record cleanup worker schedule - SessionRecordCleanupSchedule string `env:"SESSION_RECORD_CLEANUP_SCHEDULE,default=@daily"` // Sentry DSN. SentryDSN string `env:"SENTRY_DSN,default="` + // AsynqGroupMaxDelay is the maximum duration to wait before processing a group of tasks. + // + // Its time unit is second. + // + // Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information. + AsynqGroupMaxDelay int `env:"ASYNQ_GROUP_MAX_DELAY,default=1"` + // AsynqGroupGracePeriod is the grace period has configurable upper bound: you can set a maximum aggregation delay, after which Asynq server + // will aggregate the tasks regardless of the remaining grace period. + /// + // Its time unit is second. + // + // Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information. + AsynqGroupGracePeriod int64 `env:"ASYNQ_GROUP_GRACE_PERIOD,default=1"` + // AsynqGroupMaxSize is the maximum number of tasks that can be aggregated together. If that number is reached, Asynq + // server will aggregate the tasks immediately. + // + // Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information. + AsynqGroupMaxSize int `env:"ASYNQ_GROUP_MAX_SIZE,default=500"` + SessionRecordCleanupSchedule string `env:"SESSION_RECORD_CLEANUP_SCHEDULE,default=@daily"` + SessionRecordCleanupRetention int `env:"RECORD_RETENTION,default=0"` } // startSentry initializes the Sentry client. @@ -165,6 +178,17 @@ func startServer(ctx context.Context, cfg *config, store store.Store, cache stor routerOptions = append(routerOptions, routes.WithReporter(reporter)) } + worker := asynq.NewServer(cfg.RedisURI, asynq.BatchConfig(cfg.AsynqGroupMaxSize, cfg.AsynqGroupMaxDelay, int(cfg.AsynqGroupGracePeriod))) + worker.HandleTask(services.TaskDevicesHeartbeat, service.DevicesHeartbeat(), asynq.BatchTask()) + if cfg.SessionRecordCleanupRetention > 0 { + worker.HandleCron(workerlib.CronSpec(cfg.SessionRecordCleanupSchedule), service.CleanupSessions(cfg.SessionRecordCleanupRetention)) + } + + if err := worker.Start(); err != nil { + log.WithError(err). + Fatal("failed to start the worker") + } + router := routes.NewRouter(service, routerOptions...) go func() { @@ -172,6 +196,7 @@ func startServer(ctx context.Context, cfg *config, store store.Store, cache stor log.Debug("Closing HTTP server due context cancellation") + worker.Shutdown() router.Close() }() diff --git a/api/workers/doc.go b/api/workers/doc.go deleted file mode 100644 index f8fd9e22a69..00000000000 --- a/api/workers/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -// Package workers manages the API's workers, setting up a new `asynq.Server` and binding it to the Redis instance. -// -// The core of this package is the `Worker` struct, created with the `New` function. Each -// instance of this struct includes its own server and scheduler. The `Start` method is -// responsible for setting up the necessary handlers and running the server. -// -// The `sessionCleanup` worker is designed to delete recorded sessions older than a specified number -// of days. The retention period is determined by the value of the `SHELLHUB_RECORD_RETENTION` environment -// variable. To disable this worker, set `SHELLHUB_RECORD_RETENTION` to 0 (default behavior). It uses -// a cron expression from `SHELLHUB_RECORD_RETENTION` to schedule its periodic execution. -// -// The `heartbeat` worker manages heartbeat tasks, signaling the online status of devices. -// It aggregates heartbeat data and updates the online status of devices accordingly. -// The maximum number of devices to wait for before triggering is defined by the `SHELLHUB_ASYNQ_GROUP_MAX_SIZE` (default is 500). -// Another triggering mechanism involves a timeout defined in the `SHELLHUB_ASYNQ_GROUP_MAX_DELAY` environment variable. -// -// The patterns of tasks used by the handlers are available as constants with the "Task" prefix. -package workers diff --git a/api/workers/heartbeat.go b/api/workers/heartbeat.go deleted file mode 100644 index 7dcc23caa4d..00000000000 --- a/api/workers/heartbeat.go +++ /dev/null @@ -1,94 +0,0 @@ -package workers - -import ( - "bufio" - "bytes" - "context" - "strconv" - "strings" - "time" - - "github.com/hibiken/asynq" - "github.com/shellhub-io/shellhub/pkg/models" - log "github.com/sirupsen/logrus" -) - -// heartbeat worker manages heartbeat tasks, signaling the online status of devices. -// It aggregates heartbeat data and updates the online status of devices accordingly. -// The maximum number of devices to wait for before triggering is defined by the `SHELLHUB_ASYNQ_GROUP_MAX_SIZE` (default is 500). -// Another triggering mechanism involves a timeout defined in the `SHELLHUB_ASYNQ_GROUP_MAX_DELAY` environment variable. -func (w *Workers) registerHeartbeat() { - w.mux.HandleFunc(TaskHeartbeat, func(ctx context.Context, task *asynq.Task) error { - log. - WithFields(log.Fields{ - "component": "worker", - "task": TaskHeartbeat, - }). - Trace("Executing heartbeat worker.") - - scanner := bufio.NewScanner(bytes.NewReader(task.Payload())) - scanner.Split(bufio.ScanLines) - - devices := make([]models.ConnectedDevice, 0) - for scanner.Scan() { - parts := strings.Split(scanner.Text(), "=") - if len(parts) != 2 { - log.WithFields( - log.Fields{ - "component": "worker", - "task": TaskHeartbeat, - }). - Warn("failed to parse queue payload due to lack of '='.") - - continue - } - - lastSeen, err := strconv.ParseInt(parts[1], 10, 64) - if err != nil { - log.WithFields( - log.Fields{ - "component": "worker", - "task": TaskHeartbeat, - }). - WithError(err). - Warn("failed to parse timestamp to integer.") - - continue - } - - parts = strings.Split(parts[0], ":") - if len(parts) != 2 { - log.WithFields( - log.Fields{ - "component": "worker", - "task": TaskHeartbeat, - }). - Warn("failed to parse queue payload due to lack of ':'.") - - continue - } - - device := models.ConnectedDevice{ - UID: parts[1], - TenantID: parts[0], - LastSeen: time.Unix(lastSeen, 0), - } - - devices = append(devices, device) - } - - if err := w.store.DeviceSetOnline(ctx, devices); err != nil { - log. - WithError(err). - WithFields(log.Fields{ - "component": "worker", - "task": TaskHeartbeat, - }). - Error("failed to set devices as online") - - return err - } - - return nil - }) -} diff --git a/api/workers/session_cleanup.go b/api/workers/session_cleanup.go deleted file mode 100644 index 7d1eb9a90e2..00000000000 --- a/api/workers/session_cleanup.go +++ /dev/null @@ -1,74 +0,0 @@ -package workers - -import ( - "context" - "time" - - "github.com/hibiken/asynq" - log "github.com/sirupsen/logrus" -) - -// registerSessionCleanup worker is designed to delete recorded sessions older than a specified number -// of days. The retention period is determined by the value of the `SHELLHUB_RECORD_RETENTION` environment -// variable. To disable this worker, set `SHELLHUB_RECORD_RETENTION` to 0 (default behavior). It uses -// a cron expression from `SHELLHUB_RECORD_RETENTION` to schedule its periodic execution. -func (w *Workers) registerSessionCleanup() { - if w.env.SessionRecordCleanupRetention < 1 { - log.WithFields( - log.Fields{ - "component": "worker", - "task": TaskSessionCleanup, - }). - Warnf("Aborting cleanup worker due to SHELLHUB_RECORD_RETENTION equal to %d.", w.env.SessionRecordCleanupRetention) - - return - } - - w.mux.HandleFunc(TaskSessionCleanup, func(ctx context.Context, _ *asynq.Task) error { - log.WithFields( - log.Fields{ - "component": "worker", - "cron_expression": w.env.SessionRecordCleanupSchedule, - "task": TaskSessionCleanup, - }). - Trace("Executing cleanup worker.") - - lte := time.Now().UTC().AddDate(0, 0, w.env.SessionRecordCleanupRetention*(-1)) - deletedCount, updatedCount, err := w.store.SessionDeleteRecordFrameByDate(ctx, lte) - if err != nil { - log.WithFields( - log.Fields{ - "component": "worker", - "task": TaskSessionCleanup, - }). - WithError(err). - Error("Failed to delete recorded sessions") - - return err - } - - log.WithFields( - log.Fields{ - "component": "worker", - "cron_expression": w.env.SessionRecordCleanupSchedule, - "task": TaskSessionCleanup, - "lte": lte.String(), - "deleted_count": deletedCount, - "updated_count": updatedCount, - }). - Trace("Finishing cleanup worker.") - - return nil - }) - - task := asynq.NewTask(TaskSessionCleanup, nil, asynq.TaskID(TaskSessionCleanup), asynq.Queue("session_record")) - if _, err := w.scheduler.Register(w.env.SessionRecordCleanupSchedule, task); err != nil { - log.WithFields( - log.Fields{ - "component": "worker", - "task": TaskSessionCleanup, - }). - WithError(err). - Error("Failed to register the scheduler.") - } -} diff --git a/api/workers/tasks.go b/api/workers/tasks.go deleted file mode 100644 index cee9fe0c7ff..00000000000 --- a/api/workers/tasks.go +++ /dev/null @@ -1,6 +0,0 @@ -package workers - -const ( - TaskSessionCleanup = "session_record:cleanup" - TaskHeartbeat = "api:heartbeat" -) diff --git a/api/workers/util.go b/api/workers/util.go deleted file mode 100644 index ffa608265bb..00000000000 --- a/api/workers/util.go +++ /dev/null @@ -1,42 +0,0 @@ -package workers - -import ( - "github.com/shellhub-io/shellhub/pkg/envs" - log "github.com/sirupsen/logrus" -) - -type Envs struct { - MongoURI string `env:"MONGO_URI,default=mongodb://mongo:27017/main"` - RedisURI string `env:"REDIS_URI,default=redis://redis:6379"` - SessionRecordCleanupSchedule string `env:"SESSION_RECORD_CLEANUP_SCHEDULE,default=@daily"` - SessionRecordCleanupRetention int `env:"RECORD_RETENTION,default=0"` - // AsynqGroupMaxDelay is the maximum duration to wait before processing a group of tasks. - // - // Its time unit is second. - // - // Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information. - AsynqGroupMaxDelay int `env:"ASYNQ_GROUP_MAX_DELAY,default=1"` - // AsynqGroupGracePeriod is the grace period has configurable upper bound: you can set a maximum aggregation delay, after which Asynq server - // will aggregate the tasks regardless of the remaining grace period. - /// - // Its time unit is second. - // - // Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information. - AsynqGroupGracePeriod int64 `env:"ASYNQ_GROUP_GRACE_PERIOD,default=1"` - // AsynqGroupMaxSize is the maximum number of tasks that can be aggregated together. If that number is reached, Asynq - // server will aggregate the tasks immediately. - // - // Check [https://github.com/hibiken/asynq/wiki/Task-aggregation] for more information. - AsynqGroupMaxSize int `env:"ASYNQ_GROUP_MAX_SIZE,default=500"` -} - -func getEnvs() (*Envs, error) { - env, err := envs.ParseWithPrefix[Envs]("API_") - if err != nil { - log.WithError(err).Error("Failed to parse environment variables with prefix 'api'") - - return nil, err - } - - return env, nil -} diff --git a/api/workers/workers.go b/api/workers/workers.go deleted file mode 100644 index 0e35ba841f3..00000000000 --- a/api/workers/workers.go +++ /dev/null @@ -1,127 +0,0 @@ -package workers - -import ( - "context" - "fmt" - "runtime" - "strings" - "time" - - "github.com/hibiken/asynq" - "github.com/shellhub-io/shellhub/api/store" - log "github.com/sirupsen/logrus" -) - -type Workers struct { - store store.Store - - addr asynq.RedisConnOpt - srv *asynq.Server - mux *asynq.ServeMux - env *Envs - scheduler *asynq.Scheduler -} - -// New creates a new Workers instance with the provided store. It initializes -// the worker's components, such as server, scheduler, and environment settings. -func New(store store.Store) (*Workers, error) { - env, err := getEnvs() - if err != nil { - log.WithFields(log.Fields{"component": "worker"}). - WithError(err). - Error("Failed to parse the envs.") - - return nil, err - } - - addr, err := asynq.ParseRedisURI(env.RedisURI) - if err != nil { - log.WithFields(log.Fields{"component": "worker"}). - WithError(err). - Errorf("Failed to parse redis URI: %s.", env.RedisURI) - - return nil, err - } - - mux := asynq.NewServeMux() - srv := asynq.NewServer( - addr, - asynq.Config{ //nolint:exhaustruct - // NOTICE: - // To include any new task binding to a new queue (e.g., "queue:group" where 'queue' is the new queue), - // ensure that the created queue is added here. Failure to do so will result in the server not executing the task handler. - Queues: map[string]int{ - "api": 1, - "session_record": 1, - }, - GroupAggregator: asynq.GroupAggregatorFunc( - func(group string, tasks []*asynq.Task) *asynq.Task { - var b strings.Builder - - for _, task := range tasks { - b.WriteString(fmt.Sprintf("%s\n", task.Payload())) - } - - return asynq.NewTask(TaskHeartbeat, []byte(b.String())) - }, - ), - GroupMaxDelay: time.Duration(env.AsynqGroupMaxDelay) * time.Second, - GroupGracePeriod: time.Duration(env.AsynqGroupGracePeriod) * time.Second, - GroupMaxSize: env.AsynqGroupMaxSize, - Concurrency: runtime.NumCPU(), - }, - ) - scheduler := asynq.NewScheduler(addr, nil) - - w := &Workers{ - addr: addr, - env: env, - srv: srv, - mux: mux, - scheduler: scheduler, - store: store, - } - - return w, nil -} - -// Start initiates the server. It creates two new goroutines: one for the server itself -// and another for the scheduler. This method is also responsible for setting up all -// the server handlers. -func (w *Workers) Start(ctx context.Context) { - log.WithFields(log.Fields{"component": "worker"}).Info("Starting workers") - - w.setupHandlers() - - go func() { - if err := w.srv.Run(w.mux); err != nil { - log.WithFields(log.Fields{"component": "worker"}). - WithError(err). - Error("Unable to run the server.") - } - }() - - go func() { - if err := w.scheduler.Run(); err != nil { - log.WithFields(log.Fields{"component": "worker"}). - WithError(err). - Error("Unable to run the scheduler.") - } - }() - - go func() { - <-ctx.Done() - - log.Info("Shutdown workers") - - w.srv.Shutdown() - w.scheduler.Shutdown() - }() -} - -// setupHandlers is responsible for registering all the handlers of the server. It needs -// to be called before any initialization. -func (w *Workers) setupHandlers() { - w.registerSessionCleanup() - w.registerHeartbeat() -} diff --git a/pkg/api/internalclient/device.go b/pkg/api/internalclient/device.go index 5a1aa16f93b..2c1867278ef 100644 --- a/pkg/api/internalclient/device.go +++ b/pkg/api/internalclient/device.go @@ -46,7 +46,7 @@ func (c *client) DevicesOffline(uid string) error { func (c *client) DevicesHeartbeat(tenant, uid string) error { payload := []byte(fmt.Sprintf("%s:%s=%d", tenant, uid, clock.Now().Unix())) - _, err := c.asynq.Enqueue(asynq.NewTask("api:heartbeat", payload), asynq.Queue("api"), asynq.Group("heartbeats")) + _, err := c.asynq.Enqueue(asynq.NewTask("api:heartbeat", payload), asynq.Queue("api"), asynq.Group("api:heartbeat:batch")) return err }