-
-
Notifications
You must be signed in to change notification settings - Fork 140
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
We're refactoring the old `api/workers` into a proper package under `pkg`. The new package includes two interfaces: `Server`, which registers for process events, and `Client`, which submits events. A server can listen for events by calling `HandleTask`. Cron jobs are also supported via `HandleCron`. Several structs and methods have been added to facilitate task and cron processing. The new `worker/asynq` package has been created to implement these interfaces. The package uses Asynq as the backend for the worker. Different implementations can be created to implement the interfaces. Example usage: ```go func main() { server := asynq.NewServer("...") defer server.Shutdown() server.HandleTask( "queue:kind", func(_ context.Context, payload []byte) error { fmt.Println("Executing task with payload "+string(payload)) return nil }, ) if err := server.Start(); err != nil { panic(err) } client, err := asynq.NewClient("...") if err != nil { panic(err) } defer client.Close() if err := client.Submit( context.Background(), "queue:kind", []byte("payload"), ); err != nil { panic(err) } os.Exit(0) } ```
- Loading branch information
1 parent
1ff0a9c
commit 82e415b
Showing
19 changed files
with
732 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package asynq | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/hibiken/asynq" | ||
"github.com/shellhub-io/shellhub/pkg/worker" | ||
) | ||
|
||
type client struct { | ||
asynqClient *asynq.Client | ||
} | ||
|
||
func NewClient(redisURI string) (worker.Client, error) { | ||
opt, err := asynq.ParseRedisURI(redisURI) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
c := &client{asynqClient: asynq.NewClient(opt)} | ||
|
||
if c.asynqClient == nil { | ||
return nil, worker.ErrClientStartFailed | ||
} | ||
|
||
return c, nil | ||
} | ||
|
||
func (c *client) Close() error { | ||
return c.asynqClient.Close() | ||
} | ||
|
||
func (c *client) Submit(ctx context.Context, pattern worker.TaskPattern, payload []byte) error { | ||
if !pattern.Validate() { | ||
return worker.ErrTaskPatternInvalid | ||
} | ||
|
||
task := asynq.NewTask(pattern.String(), payload) | ||
if _, err := c.asynqClient.EnqueueContext(ctx, task, asynq.Queue(pattern.Queue())); err != nil { | ||
return worker.ErrSubmitFailed | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *client) SubmitToBatch(ctx context.Context, pattern worker.TaskPattern, payload []byte) error { | ||
if !pattern.Validate() { | ||
return worker.ErrTaskPatternInvalid | ||
} | ||
|
||
task := asynq.NewTask(pattern.String(), payload) | ||
if _, err := c.asynqClient.EnqueueContext(ctx, task, asynq.Queue(pattern.Queue()), asynq.Group(pattern.String())); err != nil { | ||
return worker.ErrSubmitFailed | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package asynq_test | ||
|
||
import ( | ||
"context" | ||
"runtime" | ||
"testing" | ||
"time" | ||
|
||
asynqlib "github.com/hibiken/asynq" | ||
"github.com/shellhub-io/shellhub/pkg/worker/asynq" | ||
"github.com/stretchr/testify/require" | ||
"github.com/testcontainers/testcontainers-go/modules/redis" | ||
) | ||
|
||
func TestClient(t *testing.T) { | ||
t.Parallel() | ||
ctx := context.Background() | ||
|
||
// Start the redis container | ||
redisContainer, err := redis.Run(ctx, "docker.io/redis:7") | ||
require.NoError(t, err) | ||
|
||
t.Cleanup(func() { | ||
require.NoError(t, redisContainer.Terminate(ctx)) | ||
}) | ||
|
||
redisConnStr, err := redisContainer.ConnectionString(ctx) | ||
require.NoError(t, err) | ||
|
||
// Setup server and handlers | ||
addr, err := asynqlib.ParseRedisURI(redisConnStr) | ||
require.NoError(t, err) | ||
|
||
asynqMux := asynqlib.NewServeMux() | ||
asynqSrv := asynqlib.NewServer( | ||
addr, | ||
asynqlib.Config{ //nolint:exhaustruct | ||
Concurrency: runtime.NumCPU(), | ||
Queues: map[string]int{"queue": 1}, | ||
}, | ||
) | ||
|
||
assertTaskPayload := "" | ||
asynqMux.HandleFunc("queue:kind", func(_ context.Context, t *asynqlib.Task) error { | ||
assertTaskPayload = string(t.Payload()) | ||
|
||
return nil | ||
}) | ||
|
||
require.NoError(t, asynqSrv.Start(asynqMux)) | ||
|
||
// Setup client | ||
client, err := asynq.NewClient(redisConnStr) | ||
require.NoError(t, err) | ||
defer client.Close() | ||
|
||
require.NoError(t, client.Submit(ctx, "queue:kind", []byte("task was called"))) | ||
time.Sleep(10 * time.Second) | ||
require.Equal(t, "task was called", assertTaskPayload) | ||
} |
Oops, something went wrong.