Skip to content

Commit

Permalink
chore(api): rewrite workers in service
Browse files Browse the repository at this point in the history
  • Loading branch information
heiytor authored and gustavosbarreto committed Jul 26, 2024
1 parent 79f9cb1 commit 446cfc8
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 0 deletions.
106 changes: 106 additions & 0 deletions api/services/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package services

import (
"bufio"
"bytes"
"context"
"strconv"
"strings"
"time"

"github.com/shellhub-io/shellhub/pkg/clock"
"github.com/shellhub-io/shellhub/pkg/models"
"github.com/shellhub-io/shellhub/pkg/worker"
log "github.com/sirupsen/logrus"
)

const (
TaskDevicesHeartbeat = worker.TaskPattern("api:heartbeat")
TaskCleanupSessions = worker.TaskPattern("api:cleanup-sessions")
)

// Device Heartbeat sets the device status to "online". It processes in batch.
func (s *service) DevicesHeartbeat() worker.TaskHandler {
return func(ctx context.Context, payload []byte) error {
log.WithField("task", TaskDevicesHeartbeat.String()).
Info("executing heartbeat task")

scanner := bufio.NewScanner(bytes.NewReader(payload))
scanner.Split(bufio.ScanLines)

devices := make([]models.ConnectedDevice, 0)
for scanner.Scan() {
parts := strings.Split(scanner.Text(), "=")
if len(parts) != 2 {
log.WithField("task", TaskDevicesHeartbeat.String()).
Warn("failed to parse queue payload due to lack of '='.")

continue
}

lastSeen, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
log.WithField("task", TaskDevicesHeartbeat.String()).
WithError(err).
Warn("failed to parse timestamp to integer.")

continue
}

parts = strings.Split(parts[0], ":")
if len(parts) != 2 {
log.WithField("task", TaskDevicesHeartbeat.String()).
Warn("failed to parse queue payload due to lack of ':'.")

continue
}

device := models.ConnectedDevice{
UID: parts[1],
TenantID: parts[0],
LastSeen: time.Unix(lastSeen, 0),
}

devices = append(devices, device)
}

if err := s.store.DeviceSetOnline(ctx, devices); err != nil {
log.WithField("task", TaskDevicesHeartbeat.String()).
WithError(err).
Error("failed to complete the heartbeat task")

return err
}

log.WithField("task", TaskDevicesHeartbeat.String()).
Info("finishing heartbeat task")

return nil
}
}

// CleanupSessions removes sessions older than the specified number of retention days.
func (s *service) CleanupSessions(retention int) worker.CronHandler {
return func(ctx context.Context) error {
log.WithField("task", TaskCleanupSessions.String()).
Info("executing cleanup task")

lte := clock.Now().UTC().AddDate(0, 0, retention*(-1))
deletedCount, updatedCount, err := s.store.SessionDeleteRecordFrameByDate(ctx, lte)
if err != nil {
log.WithField("task", TaskCleanupSessions.String()).
WithError(err).
Error("failed to cleanup sessions")

return err
}

log.WithField("task", TaskCleanupSessions.String()).
WithField("lte", lte.String()).
WithField("deleted_count", deletedCount).
WithField("updated_count", updatedCount).
Info("finishing cleanup task")

return nil
}
}
164 changes: 164 additions & 0 deletions api/services/task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package services

import (
"context"
"errors"
"testing"
"time"

storemocks "github.com/shellhub-io/shellhub/api/store/mocks"
"github.com/shellhub-io/shellhub/pkg/cache"
"github.com/shellhub-io/shellhub/pkg/clock"
clockmocks "github.com/shellhub-io/shellhub/pkg/clock/mocks"
"github.com/shellhub-io/shellhub/pkg/models"
"github.com/stretchr/testify/require"
)

func TestService_DevicesHeartbeat(t *testing.T) {
storeMock := new(storemocks.Store)

cases := []struct {
description string
payload []byte
requiredMocks func(context.Context)
expected error
}{
{
description: "fails when cannot set the status",
payload: []byte("00000000-0000-4000-0000-000000000000:0000000000000000000000000000000000000000000000000000000000000000=1721912837\n00000000-0000-4000-0000-000000000000:0000000000000000000000000000000000000000000000000000000000000001=1721912837"),
requiredMocks: func(ctx context.Context) {
storeMock.
On("DeviceSetOnline", ctx, []models.ConnectedDevice{
{
UID: "0000000000000000000000000000000000000000000000000000000000000000",
TenantID: "00000000-0000-4000-0000-000000000000",
LastSeen: time.Unix(1721912837, 0),
},
{
UID: "0000000000000000000000000000000000000000000000000000000000000001",
TenantID: "00000000-0000-4000-0000-000000000000",
LastSeen: time.Unix(1721912837, 0),
},
}).
Return(errors.New("error")).
Once()
},
expected: errors.New("error"),
},
{
description: "succeeds but one payload does not have ':'",
payload: []byte("00000000-0000-4000-0000-0000000000000000000000000000000000000000000000000000000000000000000000000000=1721912837\n00000000-0000-4000-0000-000000000000:0000000000000000000000000000000000000000000000000000000000000001=1721912837"),
requiredMocks: func(ctx context.Context) {
storeMock.
On("DeviceSetOnline", ctx, []models.ConnectedDevice{
{
UID: "0000000000000000000000000000000000000000000000000000000000000001",
TenantID: "00000000-0000-4000-0000-000000000000",
LastSeen: time.Unix(1721912837, 0),
},
}).
Return(nil).
Once()
},
expected: nil,
},
{
description: "succeeds but one payload does not have '='",
payload: []byte("00000000-0000-4000-0000-000000000000:00000000000000000000000000000000000000000000000000000000000000001721912837\n00000000-0000-4000-0000-000000000000:0000000000000000000000000000000000000000000000000000000000000001=1721912837"),
requiredMocks: func(ctx context.Context) {
storeMock.
On("DeviceSetOnline", ctx, []models.ConnectedDevice{
{
UID: "0000000000000000000000000000000000000000000000000000000000000001",
TenantID: "00000000-0000-4000-0000-000000000000",
LastSeen: time.Unix(1721912837, 0),
},
}).
Return(nil).
Once()
},
expected: nil,
},
{
description: "succeeds",
payload: []byte("00000000-0000-4000-0000-000000000000:0000000000000000000000000000000000000000000000000000000000000000=1721912837\n00000000-0000-4000-0000-000000000000:0000000000000000000000000000000000000000000000000000000000000001=1721912837"),
requiredMocks: func(ctx context.Context) {
storeMock.
On("DeviceSetOnline", ctx, []models.ConnectedDevice{
{
UID: "0000000000000000000000000000000000000000000000000000000000000000",
TenantID: "00000000-0000-4000-0000-000000000000",
LastSeen: time.Unix(1721912837, 0),
},
{
UID: "0000000000000000000000000000000000000000000000000000000000000001",
TenantID: "00000000-0000-4000-0000-000000000000",
LastSeen: time.Unix(1721912837, 0),
},
}).
Return(nil).
Once()
},
expected: nil,
},
}

s := NewService(storeMock, privateKey, publicKey, cache.NewNullCache(), clientMock, nil)

for _, tc := range cases {
t.Run(tc.description, func(tt *testing.T) {
ctx := context.Background()
tc.requiredMocks(ctx)
require.Equal(tt, tc.expected, s.DevicesHeartbeat()(ctx, tc.payload))
})
}
}

func TestService_CleanupSessions(t *testing.T) {
storeMock := new(storemocks.Store)
clockMock := new(clockmocks.Clock)

clock.DefaultBackend = clockMock
now := time.Now()
clockMock.On("Now").Return(now)

cases := []struct {
description string
retention int
requiredMocks func(context.Context)
expected error
}{
{
description: "fails",
retention: 30,
requiredMocks: func(ctx context.Context) {
storeMock.
On("SessionDeleteRecordFrameByDate", ctx, now.UTC().AddDate(0, 0, 30*(-1))).
Return(int64(0), int64(0), errors.New("error")).
Once()
},
expected: errors.New("error"),
},
{
description: "succeeds",
retention: 30,
requiredMocks: func(ctx context.Context) {
storeMock.
On("SessionDeleteRecordFrameByDate", ctx, now.UTC().AddDate(0, 0, 30*(-1))).
Return(int64(30), int64(0), nil).
Once()
},
expected: nil,
},
}

s := NewService(storeMock, privateKey, publicKey, cache.NewNullCache(), clientMock, nil)

for _, tc := range cases {
t.Run(tc.description, func(tt *testing.T) {
ctx := context.Background()
tc.requiredMocks(ctx)
require.Equal(tt, tc.expected, s.CleanupSessions(tc.retention)(ctx))
})
}
}

0 comments on commit 446cfc8

Please sign in to comment.