diff --git a/agent/go.mod b/agent/go.mod index 648bcc51292..07d634a242d 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -22,7 +22,6 @@ require ( github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/creack/pty v1.1.18 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/docker v27.0.3+incompatible // indirect @@ -60,6 +59,7 @@ require ( golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect + gotest.tools/v3 v3.5.1 // indirect ) replace github.com/shellhub-io/shellhub => ../ diff --git a/agent/go.sum b/agent/go.sum index 22b81d6d041..47e83e37062 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -198,10 +198,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= -google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= -google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/api/go.sum b/api/go.sum index c7eff13e7f5..90f55b8a0ee 100644 --- a/api/go.sum +++ b/api/go.sum @@ -140,9 +140,8 @@ github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgo github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -225,8 +224,8 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= -github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/rwtodd/Go.Sed v0.0.0-20210816025313-55464686f9ef/go.mod h1:8AEUvGVi2uQ5b24BIhcr0GCcpd/RNAFWaN2CJFrWIIQ= github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= diff --git a/cli/go.sum b/cli/go.sum index a0297261663..fcd9a2c50ee 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -44,9 +44,8 @@ github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdf github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -107,9 +106,8 @@ github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgo github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -180,8 +178,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= -github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/rwtodd/Go.Sed v0.0.0-20210816025313-55464686f9ef/go.mod h1:8AEUvGVi2uQ5b24BIhcr0GCcpd/RNAFWaN2CJFrWIIQ= github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= diff --git a/go.mod b/go.mod index 34f43bf3650..bcd9784d2db 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 github.com/Masterminds/semver v1.5.0 + github.com/adhocore/gronx v1.8.1 github.com/creack/pty v1.1.18 github.com/docker/docker v27.0.3+incompatible github.com/gliderlabs/ssh v0.3.5 @@ -27,16 +28,24 @@ require ( github.com/sethvargo/go-envconfig v0.9.0 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 + github.com/testcontainers/testcontainers-go/modules/redis v0.32.0 golang.org/x/crypto v0.22.0 golang.org/x/sys v0.19.0 ) require ( + dario.cat/mergo v1.0.0 // indirect + github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/Microsoft/hcsshim v0.11.5 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/containerd/containerd v1.7.18 // indirect + github.com/containerd/errdefs v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.6.0 // indirect @@ -44,9 +53,9 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -55,12 +64,16 @@ require ( github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/kr/fs v0.1.0 // indirect - github.com/kr/pretty v0.3.1 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/leodido/go-urn v1.2.2 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/user v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/nwaples/rardecode v1.1.3 // indirect @@ -69,10 +82,17 @@ require ( github.com/oschwald/maxminddb-golang v1.10.0 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/redis/go-redis/v9 v9.0.3 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect + github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/shirou/gopsutil/v3 v3.23.12 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/spf13/cast v1.3.1 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/testcontainers/testcontainers-go v0.32.0 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect github.com/ulikunitz/xz v0.5.11 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect @@ -80,6 +100,7 @@ require ( github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect go.opentelemetry.io/otel v1.26.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect @@ -90,7 +111,8 @@ require ( golang.org/x/sync v0.6.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.5.1 // indirect ) diff --git a/pkg/worker/asynq/client.go b/pkg/worker/asynq/client.go new file mode 100644 index 00000000000..901c621be2b --- /dev/null +++ b/pkg/worker/asynq/client.go @@ -0,0 +1,57 @@ +package asynq + +import ( + "context" + + "github.com/hibiken/asynq" + "github.com/shellhub-io/shellhub/pkg/worker" +) + +type client struct { + asynqClient *asynq.Client +} + +func NewClient(redisURI string) (worker.Client, error) { + opt, err := asynq.ParseRedisURI(redisURI) + if err != nil { + return nil, err + } + + c := &client{asynqClient: asynq.NewClient(opt)} + + if c.asynqClient == nil { + return nil, worker.ErrClientStartFailed + } + + return c, nil +} + +func (c *client) Close() error { + return c.asynqClient.Close() +} + +func (c *client) Submit(ctx context.Context, pattern worker.TaskPattern, payload []byte) error { + if !pattern.Validate() { + return worker.ErrTaskPatternInvalid + } + + task := asynq.NewTask(pattern.String(), payload) + if _, err := c.asynqClient.EnqueueContext(ctx, task, asynq.Queue(pattern.Queue())); err != nil { + return worker.ErrSubmitFailed + } + + return nil +} + +func (c *client) SubmitToBatch(ctx context.Context, pattern worker.TaskPattern, payload []byte) error { + if !pattern.Validate() { + return worker.ErrTaskPatternInvalid + } + + task := asynq.NewTask(pattern.String(), payload) + if _, err := c.asynqClient.EnqueueContext(ctx, task, asynq.Queue(pattern.Queue()), asynq.Group(pattern.String())); err != nil { + return worker.ErrSubmitFailed + } + + return nil +} diff --git a/pkg/worker/asynq/client_test.go b/pkg/worker/asynq/client_test.go new file mode 100644 index 00000000000..f3f7b9390ad --- /dev/null +++ b/pkg/worker/asynq/client_test.go @@ -0,0 +1,60 @@ +package asynq_test + +import ( + "context" + "runtime" + "testing" + "time" + + asynqlib "github.com/hibiken/asynq" + "github.com/shellhub-io/shellhub/pkg/worker/asynq" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/modules/redis" +) + +func TestClient(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // Start the redis container + redisContainer, err := redis.Run(ctx, "docker.io/redis:7") + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, redisContainer.Terminate(ctx)) + }) + + redisConnStr, err := redisContainer.ConnectionString(ctx) + require.NoError(t, err) + + // Setup server and handlers + addr, err := asynqlib.ParseRedisURI(redisConnStr) + require.NoError(t, err) + + asynqMux := asynqlib.NewServeMux() + asynqSrv := asynqlib.NewServer( + addr, + asynqlib.Config{ //nolint:exhaustruct + Concurrency: runtime.NumCPU(), + Queues: map[string]int{"queue": 1}, + }, + ) + + assertTaskPayload := "" + asynqMux.HandleFunc("queue:kind", func(_ context.Context, t *asynqlib.Task) error { + assertTaskPayload = string(t.Payload()) + + return nil + }) + + require.NoError(t, asynqSrv.Start(asynqMux)) + + // Setup client + client, err := asynq.NewClient(redisConnStr) + require.NoError(t, err) + defer client.Close() + + require.NoError(t, client.Submit(ctx, "queue:kind", []byte("task was called"))) + time.Sleep(10 * time.Second) + require.Equal(t, "task was called", assertTaskPayload) +} diff --git a/pkg/worker/asynq/server.go b/pkg/worker/asynq/server.go new file mode 100644 index 00000000000..656d9ad7cbd --- /dev/null +++ b/pkg/worker/asynq/server.go @@ -0,0 +1,146 @@ +package asynq + +import ( + "runtime" + "time" + + "github.com/hibiken/asynq" + "github.com/shellhub-io/shellhub/pkg/uuid" + "github.com/shellhub-io/shellhub/pkg/worker" +) + +type ServerOption func(s *server) error + +// BatchConfig sets the batch configuration of the server. It's required when +// setting a task with [BatchTask] option. +// +// maxSize is the maximum number of tasks that a batch task can handle before +// processing. +// +// maxDelay is the maximum amount of time that a batch task can wait before +// processing. +// +// gracePeriod is the amount of time that the server will wait before aggregating +// batch tasks. +func BatchConfig(maxSize, maxDelay, gracePeriod int) ServerOption { + return func(s *server) error { + s.batchConfig.maxSize = maxSize + s.batchConfig.maxDelay = time.Second * time.Duration(maxDelay) + s.batchConfig.gracePeriod = time.Second * time.Duration(gracePeriod) + + return nil + } +} + +type server struct { + redisURI string + asynqSrv *asynq.Server + asynqMux *asynq.ServeMux + asynqSch *asynq.Scheduler + batchConfig *batchConfig + + queues queues + tasks []worker.Task + cronjobs []worker.Cronjob +} + +func NewServer(redisURI string, opts ...ServerOption) worker.Server { + s := &server{ + redisURI: redisURI, + queues: queues{cronQueue: 1}, + tasks: []worker.Task{}, + cronjobs: []worker.Cronjob{}, + batchConfig: &batchConfig{}, + } + + for _, opt := range opts { + if err := opt(s); err != nil { + return nil // NOTE: currently all opts returns nil + } + } + + return s +} + +func (s *server) HandleTask(pattern worker.TaskPattern, handler worker.TaskHandler, opts ...worker.TaskOption) { + pattern.MustValidate() + + if _, ok := s.queues[pattern.Queue()]; !ok { + s.queues[pattern.Queue()] = 1 + } + + task := worker.Task{Pattern: pattern, Handler: handler} + for _, opt := range opts { + opt(&task) + } + + s.tasks = append(s.tasks, task) +} + +func (s *server) HandleCron(spec worker.CronSpec, handler worker.CronHandler) { + spec.MustValidate() + + cronjob := worker.Cronjob{ + Identifier: uuid.Generate(), + Spec: spec, + Handler: handler, + } + + s.cronjobs = append(s.cronjobs, cronjob) +} + +func (s *server) Start() error { + if err := s.setupAsynq(); err != nil { + return err + } + + if err := s.asynqSrv.Start(s.asynqMux); err != nil { + return worker.ErrServerStartFailed + } + + if err := s.asynqSch.Start(); err != nil { + return worker.ErrServerStartFailed + } + + return nil +} + +func (s *server) Shutdown() { + s.asynqSrv.Shutdown() + s.asynqSch.Shutdown() +} + +func (s *server) setupAsynq() error { + addr, err := asynq.ParseRedisURI(s.redisURI) + if err != nil { + return err + } + + s.asynqSch = asynq.NewScheduler(addr, nil) + s.asynqMux = asynq.NewServeMux() + s.asynqSrv = asynq.NewServer( + addr, + asynq.Config{ //nolint:exhaustruct + Concurrency: runtime.NumCPU(), + Queues: s.queues, + GroupAggregator: asynq.GroupAggregatorFunc(aggregate), + GroupMaxSize: s.batchConfig.maxSize, + GroupMaxDelay: s.batchConfig.maxDelay, + GroupGracePeriod: s.batchConfig.gracePeriod, + }, + ) + + for _, t := range s.tasks { + s.asynqMux.HandleFunc(t.Pattern.String(), taskToAsynq(t.Handler)) + } + + for _, c := range s.cronjobs { + s.asynqMux.HandleFunc(c.Identifier, cronToAsynq(c.Handler)) + task := asynq.NewTask(c.Identifier, nil, asynq.Queue(cronQueue)) + if _, err := s.asynqSch.Register(c.Spec.String(), task); err != nil { + return worker.ErrHandleCronFailed + } + } + + return nil +} diff --git a/pkg/worker/asynq/server_test.go b/pkg/worker/asynq/server_test.go new file mode 100644 index 00000000000..ddfa1672d9b --- /dev/null +++ b/pkg/worker/asynq/server_test.go @@ -0,0 +1,60 @@ +package asynq_test + +import ( + "context" + "testing" + "time" + + asynqlib "github.com/hibiken/asynq" + "github.com/shellhub-io/shellhub/pkg/worker/asynq" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/modules/redis" +) + +func TestServer(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // Start the redis container + redisContainer, err := redis.Run(ctx, "docker.io/redis:7") + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, redisContainer.Terminate(ctx)) + }) + + redisConnStr, err := redisContainer.ConnectionString(ctx) + require.NoError(t, err) + + // Setup server and handlers + srv := asynq.NewServer(redisConnStr, asynq.BatchConfig(2, 1, 1)) + defer srv.Shutdown() + + assertTaskPayload := "" + srv.HandleTask("queue:task", func(_ context.Context, payload []byte) error { + assertTaskPayload = string(payload) + + return nil + }) + + assertCronPayload := "" + srv.HandleCron("* * * * *", func(_ context.Context) error { + assertCronPayload = "cron was called" + + return nil + }) + + require.NoError(t, srv.Start()) + + // Setup asynq client and enqueue task + opt, err := asynqlib.ParseRedisURI(redisConnStr) + require.NoError(t, err) + asynqClient := asynqlib.NewClient(opt) + _, err = asynqClient.Enqueue(asynqlib.NewTask("queue:task", []byte("task was called")), asynqlib.Queue("queue")) + require.NoError(t, err) + + // Assert that tasks was called. We sleep for 1 minute to wait the server process the cronjob + time.Sleep(1 * time.Minute) + require.Equal(t, assertTaskPayload, "task was called") + require.Equal(t, assertCronPayload, "cron was called") +} diff --git a/pkg/worker/asynq/task.go b/pkg/worker/asynq/task.go new file mode 100644 index 00000000000..48c51d859b8 --- /dev/null +++ b/pkg/worker/asynq/task.go @@ -0,0 +1,22 @@ +package asynq + +import ( + "github.com/shellhub-io/shellhub/pkg/worker" +) + +// BatchTask configures a task to process a list of tasks in batches. +// Each task payload will be aggregated, separated by '\n'. Example: +// +// func(ctx context.Context, payload []byte) error { +// scanner := bufio.NewScanner(bytes.NewReader(payload)) +// scanner.Split(bufio.ScanLines) +// +// for scanner.Scan() { +// // Process each task payload +// } +// } +func BatchTask() worker.TaskOption { + return func(t *worker.Task) { + t.Pattern += ":batch" + } +} diff --git a/pkg/worker/asynq/utils.go b/pkg/worker/asynq/utils.go new file mode 100644 index 00000000000..bc0f963b64f --- /dev/null +++ b/pkg/worker/asynq/utils.go @@ -0,0 +1,56 @@ +package asynq + +import ( + "bytes" + "context" + "time" + + "github.com/hibiken/asynq" + "github.com/shellhub-io/shellhub/pkg/worker" +) + +// batchConfig configures the asynq batch settings. +type batchConfig struct { + // maxSize is the maximum number of tasks that a batch task can handle before + // processing. + maxSize int + // maxDelay is the maximum amount of time that a batch task can wait before + // processing. + maxDelay time.Duration + // gracePeriod is the amount of time that the server will wait before aggregating + // batch tasks. + gracePeriod time.Duration +} + +// queues is a map of queues where the key is the name and the value is the priority. +type queues map[string]int + +// cronQueue is the queue where's all the cronjobs will send tasks. +const cronQueue = "cron" + +// aggregate is the handler that Asynq will execute to aggregate the tasks. +// It will combine all task payloads into one, separated by '\n', and then +// execute a new task with the name "{group}+:aggregated". +func aggregate(group string, tasks []*asynq.Task) *asynq.Task { + buf := new(bytes.Buffer) + for _, t := range tasks { + buf.Write(t.Payload()) + buf.WriteByte('\n') + } + + return asynq.NewTask(group+":batch", buf.Bytes()) +} + +// cronToAsynq converts a [github.com/shellhub-io/shellhub/pkg/api/worker.CronHandler] to an asynq handler. +func cronToAsynq(h worker.CronHandler) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, _ *asynq.Task) error { + return h(ctx) + } +} + +// taskToAsynq converts a [github.com/shellhub-io/shellhub/pkg/api/worker.TaskHandler] to an asynq handler. +func taskToAsynq(h worker.TaskHandler) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, task *asynq.Task) error { + return h(ctx, task.Payload()) + } +} diff --git a/pkg/worker/client.go b/pkg/worker/client.go new file mode 100644 index 00000000000..e4569524904 --- /dev/null +++ b/pkg/worker/client.go @@ -0,0 +1,22 @@ +package worker + +import ( + "context" +) + +// Client represents a client that submits tasks to be handled by the server. +type Client interface { + // Submit sends a payload to be processed by the task handler registered with the specified pattern. + // The task will be executed immediately if it matches the pattern. + // + // It returns an error if the pattern is invalid or if there is an issue submitting the task. + Submit(ctx context.Context, pattern TaskPattern, payload []byte) error + // SubmitToBatch sends a payload to be added to a batch for processing. The task handler registered with + // the specified pattern will process the batch either when a series of payloads have been enqueued + // or when the specified time delay is reached. + // + // It returns an error if the pattern is invalid or if there is an issue submitting the task to the batch. + SubmitToBatch(ctx context.Context, pattern TaskPattern, payload []byte) error + // Close closes the client's connection. + Close() error +} diff --git a/pkg/worker/cron.go b/pkg/worker/cron.go new file mode 100644 index 00000000000..c11b9d8036b --- /dev/null +++ b/pkg/worker/cron.go @@ -0,0 +1,41 @@ +package worker + +import ( + "context" + + "github.com/adhocore/gronx" +) + +type CronSpec string + +func (cs CronSpec) String() string { + return string(cs) +} + +// _gron is only used to validate cron expressions. It's initialized outside of the +// checker to avoid multiples allocs of the struct. +var _gron = gronx.New() + +// Validate reports whether the pattern is valid or not. +func (cs CronSpec) Validate() bool { + return _gron.IsValid(cs.String()) +} + +// MustValidate is similar to [CronSpec.Validate] but panics when invalid. +func (cs CronSpec) MustValidate() { + if !cs.Validate() { + panic("invalid cron specification: " + cs) + } +} + +type CronHandler func(ctx context.Context) error + +type Cronjob struct { + // Identifier is a UUID for the cron job, used internally to register the task with the + // scheduler. + Identifier string + // Spec is the cron expression that defines the schedule for the cron job. + Spec CronSpec + // Handler is the callback function that will be executed when the cron specification is met. + Handler CronHandler +} diff --git a/pkg/worker/errors.go b/pkg/worker/errors.go new file mode 100644 index 00000000000..0b485b3f667 --- /dev/null +++ b/pkg/worker/errors.go @@ -0,0 +1,12 @@ +package worker + +import "errors" + +var ( + ErrHandleCronFailed = errors.New("failed to handle cron") + ErrServerStartFailed = errors.New("failed to start the worker server") + ErrClientStartFailed = errors.New("failed to start the worker client") + ErrTaskPatternInvalid = errors.New("task pattern is invalid") + ErrCronSpecInvalid = errors.New("cron specification is invalid") + ErrSubmitFailed = errors.New("failed to submit the payload") +) diff --git a/pkg/worker/mocks/client.go b/pkg/worker/mocks/client.go new file mode 100644 index 00000000000..e75b9ea1ff9 --- /dev/null +++ b/pkg/worker/mocks/client.go @@ -0,0 +1,83 @@ +// Code generated by mockery v2.43.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + worker "github.com/shellhub-io/shellhub/pkg/worker" + mock "github.com/stretchr/testify/mock" +) + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Client) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Submit provides a mock function with given fields: ctx, pattern, payload +func (_m *Client) Submit(ctx context.Context, pattern worker.TaskPattern, payload []byte) error { + ret := _m.Called(ctx, pattern, payload) + + if len(ret) == 0 { + panic("no return value specified for Submit") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, worker.TaskPattern, []byte) error); ok { + r0 = rf(ctx, pattern, payload) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SubmitToBatch provides a mock function with given fields: ctx, pattern, payload +func (_m *Client) SubmitToBatch(ctx context.Context, pattern worker.TaskPattern, payload []byte) error { + ret := _m.Called(ctx, pattern, payload) + + if len(ret) == 0 { + panic("no return value specified for SubmitToBatch") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, worker.TaskPattern, []byte) error); ok { + r0 = rf(ctx, pattern, payload) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewClient(t interface { + mock.TestingT + Cleanup(func()) +}) *Client { + mock := &Client{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/worker/mocks/server.go b/pkg/worker/mocks/server.go new file mode 100644 index 00000000000..6e3a99e219c --- /dev/null +++ b/pkg/worker/mocks/server.go @@ -0,0 +1,67 @@ +// Code generated by mockery v2.43.1. DO NOT EDIT. + +package mocks + +import ( + worker "github.com/shellhub-io/shellhub/pkg/worker" + mock "github.com/stretchr/testify/mock" +) + +// Server is an autogenerated mock type for the Server type +type Server struct { + mock.Mock +} + +// HandleCron provides a mock function with given fields: spec, cronFunc +func (_m *Server) HandleCron(spec worker.CronSpec, cronFunc worker.CronHandler) { + _m.Called(spec, cronFunc) +} + +// HandleTask provides a mock function with given fields: pattern, cb, opts +func (_m *Server) HandleTask(pattern worker.TaskPattern, cb worker.TaskHandler, opts ...worker.TaskOption) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, pattern, cb) + _ca = append(_ca, _va...) + _m.Called(_ca...) +} + +// Shutdown provides a mock function with given fields: +func (_m *Server) Shutdown() { + _m.Called() +} + +// Start provides a mock function with given fields: +func (_m *Server) Start() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewServer creates a new instance of Server. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewServer(t interface { + mock.TestingT + Cleanup(func()) +}) *Server { + mock := &Server{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/worker/server.go b/pkg/worker/server.go new file mode 100644 index 00000000000..1e0b23535c4 --- /dev/null +++ b/pkg/worker/server.go @@ -0,0 +1,23 @@ +package worker + +// Server represents a process that handles tasks and cronjobs. A [Client] can submit +// tasks. +type Server interface { + // HandleTask registers a task with the provided pattern. The task will be executed + // every time a client enqueues a payload matching the pattern. Batch tasks will be executed + // when the client enqueues a series of payloads or when the specified time delay is reached. + // + // It panics if the pattern is invalid. Batch tasks are specified in options. + HandleTask(pattern TaskPattern, cb TaskHandler, opts ...TaskOption) + // HandleCron executes the cronFunc every time the cron specification is met. + // + // It panics if the cron specification is invalid. + HandleCron(spec CronSpec, cronFunc CronHandler) + // Start initializes and starts the worker in a non-blocking manner. The server is + // turned off whedn the context was done. + // + // It returns an error if any issues occur during the startup process. + Start() error + // Shutdown gracefully shuts down the server. + Shutdown() +} diff --git a/pkg/worker/task.go b/pkg/worker/task.go new file mode 100644 index 00000000000..91e59a96d21 --- /dev/null +++ b/pkg/worker/task.go @@ -0,0 +1,42 @@ +package worker + +import ( + "context" + "strings" +) + +// TaskPattern represents a pattern to which a task can register to listen. +// It must be in the format "queue:kind". +type TaskPattern string + +func (tp TaskPattern) String() string { + return string(tp) +} + +// Validate reports whether the pattern is valid or not. +func (tp TaskPattern) Validate() bool { + return len(strings.Split(string(tp), ":")) == 2 +} + +// MustValidate is similar to [TaskPattern.Validate] but panics when invalid. +func (tp TaskPattern) MustValidate() { + if !tp.Validate() { + panic("invalid task pattern: " + tp) + } +} + +// Queue returns the queue component of the pattern. +func (tp TaskPattern) Queue() string { + return strings.Split(string(tp), ":")[0] +} + +type TaskHandler func(ctx context.Context, payload []byte) error + +type Task struct { + // Pattern is a string to which the task can listen to message/events. + Pattern TaskPattern + // Handler is the callback that the task will execute when receiving messages/events. + Handler TaskHandler +} + +type TaskOption func(t *Task) diff --git a/ssh/go.sum b/ssh/go.sum index 095306ef6ea..afe22ca3c90 100644 --- a/ssh/go.sum +++ b/ssh/go.sum @@ -45,8 +45,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/hibiken/asynq v0.24.1 h1:+5iIEAyA9K/lcSPvx3qoPtsKJeKI5u9aOIvUmSsazEw= github.com/hibiken/asynq v0.24.1/go.mod h1:u5qVeSbrnfT+vtG5Mq8ZPzQu/BmCKMHvTGb91uy9Tts= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -72,8 +72,8 @@ github.com/redis/go-redis/v9 v9.0.3 h1:+7mmR26M0IvyLxGZUHxu4GiBkJkVDid0Un+j4ScYu github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= -github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rwtodd/Go.Sed v0.0.0-20210816025313-55464686f9ef/go.mod h1:8AEUvGVi2uQ5b24BIhcr0GCcpd/RNAFWaN2CJFrWIIQ= github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=