Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent,pkg): add connector mode to agent #3115

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions agent/connector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package connector

import (
"context"
)

// Container is a struct that represents a container that will be managed by the connector.
type Container struct {
// ID is the container ID.
ID string
// ServerAddress is the ShellHub address of the server that the agent will connect to.
ServerAddress string
// Tenant is the tenant ID of the namespace that the agent belongs to.
Tenant string
// PrivateKey is the private key of the device. Specify the path to store the container private key. If not
// provided, the agent will generate a new one. This is required.
PrivateKey string
// Cancel is a function that is used to stop the goroutine that is running the agent for this container.
Cancel context.CancelFunc
}

// Connector is an interface that defines the methods that a connector must implement.
type Connector interface {
// List lists all containers running on the host.
List(ctx context.Context) ([]string, error)
// Start starts the agent for the container with the given ID.
Start(ctx context.Context, id string)
// Stop stops the agent for the container with the given ID.
Stop(ctx context.Context, id string)
// Listen listens for events and starts or stops the agent for the container that was created or removed.
Listen(ctx context.Context) error
}
222 changes: 222 additions & 0 deletions agent/connector/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package connector

import (
"context"
"fmt"
"sync"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
dockerclient "github.com/docker/docker/client"
"github.com/shellhub-io/shellhub/pkg/agent"
log "github.com/sirupsen/logrus"
)

var _ Connector = new(DockerConnector)

// DockerConnector is a struct that represents a connector that uses Docker as the container runtime.
type DockerConnector struct {
mu sync.Mutex
// server is the ShellHub address of the server that the agent will connect to.
server string
// tenant is the tenant ID of the namespace that the agent belongs to.
tenant string
// cli is the Docker client.
cli *dockerclient.Client
// privateKeys is the path to the directory that contains the private keys for the containers.
privateKeys string
// cancels is a map that contains the cancel functions for each container.
// This is used to stop the agent for a container, marking as done its context and closing the agent.
cancels map[string]context.CancelFunc
}

// NewDockerConnector creates a new [Connector] that uses Docker as the container runtime.
func NewDockerConnector(server string, tenant string, privateKey string) (Connector, error) {
cli, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
if err != nil {
return nil, err
}

return &DockerConnector{
server: server,
tenant: tenant,
cli: cli,
privateKeys: privateKey,
cancels: make(map[string]context.CancelFunc),
}, nil
}

// events returns the docker events.
func (d *DockerConnector) events(ctx context.Context) (<-chan events.Message, <-chan error) {
return d.cli.Events(ctx, types.EventsOptions{})
}

func (d *DockerConnector) List(ctx context.Context) ([]string, error) {
containers, err := d.cli.ContainerList(ctx, types.ContainerListOptions{})
if err != nil {
return nil, err
}

list := make([]string, len(containers))
for i, container := range containers {
list[i] = container.ID
}

return list, nil
}

// Start starts the agent for the container with the given ID.
func (d *DockerConnector) Start(ctx context.Context, id string) {
id = id[:12]

d.mu.Lock()
ctx, d.cancels[id] = context.WithCancel(ctx)
d.mu.Unlock()

privateKey := fmt.Sprintf("%s/%s.key", d.privateKeys, id)
go initContainerAgent(ctx, Container{
ID: id,
ServerAddress: d.server,
Tenant: d.tenant,
PrivateKey: privateKey,
Cancel: d.cancels[id],
})
}

// Stop stops the agent for the container with the given ID.
func (d *DockerConnector) Stop(_ context.Context, id string) {
id = id[:12]

d.mu.Lock()
defer d.mu.Unlock()

cancel, ok := d.cancels[id]
if ok {
cancel()
delete(d.cancels, id)
}
}

// Listen listens for events and starts or stops the agent for the containers.
func (d *DockerConnector) Listen(ctx context.Context) error {
containers, err := d.List(ctx)
if err != nil {
return err
}

for _, container := range containers {
d.Start(ctx, container)
}

events, errs := d.events(ctx)
for {
select {
case <-ctx.Done():
return nil
case err := <-errs:
return err
case container := <-events:
// NOTICE: "start" and "die" Docker's events are call every time a new container start or stop,
// independently how the command was run. For example, if a container was started with `docker run -d`, the
// "start" event will be called, but if the same container was started with `docker start <container-id>`,
// the "start" event will be called too. The same happens with the "die" event.
switch container.Action {
case "start":
d.Start(ctx, container.ID)
case "die":
d.Stop(ctx, container.ID)
}
}
}
}

// initContainerAgent initializes the agent for a container.
func initContainerAgent(ctx context.Context, container Container) {
cfg := &agent.Config{
ServerAddress: container.ServerAddress,
TenantID: container.Tenant,
PrivateKey: container.PrivateKey,
PreferredHostname: container.ID,
PreferredIdentity: container.ID,
Mode: agent.ModeConnector,
KeepAliveInterval: 30,
}

log.WithFields(log.Fields{
"id": container.ID,
"identity": cfg.PreferredIdentity,
"hostname": cfg.PreferredHostname,
"tenant_id": cfg.TenantID,
"server_address": cfg.ServerAddress,
"timestamp": time.Now(),
}).Info("Connector container started")

ag, err := agent.NewAgentWithConfig(cfg)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"id": container.ID,
"configuration": cfg,
}).Fatal("Failed to create agent")
}

if err := ag.Initialize(); err != nil {
log.WithError(err).WithFields(log.Fields{
"id": container.ID,
"configuration": cfg,
}).Fatal("Failed to initialize agent")
}

go func() {
if err := ag.Ping(ctx, 0); err != nil {
log.WithError(err).WithFields(log.Fields{
"id": container.ID,
"identity": cfg.PreferredIdentity,
"hostname": cfg.PreferredHostname,
"tenant_id": cfg.TenantID,
"server_address": cfg.ServerAddress,
"timestamp": time.Now(),
}).Fatal("Failed to ping server")
}

log.WithFields(log.Fields{
"id": container.ID,
"identity": cfg.PreferredIdentity,
"hostname": cfg.PreferredHostname,
"tenant_id": cfg.TenantID,
"server_address": cfg.ServerAddress,
"timestamp": time.Now(),
}).Info("Stopped pinging server")
}()

log.WithFields(log.Fields{
"id": container.ID,
"identity": cfg.PreferredIdentity,
"hostname": cfg.PreferredHostname,
"tenant_id": cfg.TenantID,
"server_address": cfg.ServerAddress,
"timestamp": time.Now(),
}).Info("Listening for connections")

// NOTICE(r): listing for connection and wait for a channel message to close the agent. It will receives
// this mensagem when something out of this goroutine send a `done`, what will cause the agent closes
// and no more connection to be allowed until it be started again.
if err := ag.Listen(ctx); err != nil {
log.WithError(err).WithFields(log.Fields{
"id": container.ID,
"identity": cfg.PreferredIdentity,
"hostname": cfg.PreferredHostname,
"tenant_id": cfg.TenantID,
"server_address": cfg.ServerAddress,
"timestamp": time.Now(),
}).Fatal("Failed to listen for connections")
}

log.WithFields(log.Fields{
"id": container.ID,
"identity": cfg.PreferredIdentity,
"hostname": cfg.PreferredHostname,
"tenant_id": cfg.TenantID,
"server_address": cfg.ServerAddress,
}).Info("Connector container done")
}
47 changes: 47 additions & 0 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"runtime"
"time"

"github.com/Masterminds/semver"
"github.com/kelseyhightower/envconfig"
"github.com/shellhub-io/shellhub/agent/connector"
"github.com/shellhub-io/shellhub/pkg/agent"
"github.com/shellhub-io/shellhub/pkg/agent/pkg/selfupdater"
"github.com/shellhub-io/shellhub/pkg/envs"
Expand Down Expand Up @@ -263,6 +265,51 @@ It is initialized by the agent when a new SFTP session is created.`,
},
})

rootCmd.AddCommand(&cobra.Command{ // nolint: exhaustruct
Use: "connector",
Short: "Starts the Connector",
Long: "Starts the Connector, a special kind of Agent that turns your docker containers into ShellHub devices.",
Run: func(cmd *cobra.Command, args []string) {
cfg, err := envs.ParseWithPrefix[agent.ConfigConnector]("shellhub")
if err != nil {
envconfig.Usage("shellhub", &cfg) // nolint:errcheck
log.Fatal(err)
}

cfg.PrivateKeys = path.Dir(cfg.PrivateKeys)

log.WithFields(log.Fields{
"version": AgentVersion,
"address": cfg.ServerAddress,
"tenant_id": cfg.TenantID,
"private_keys": cfg.PrivateKeys,
}).Info("Starting ShellHub Connector")

connector, err := connector.NewDockerConnector(cfg.ServerAddress, cfg.TenantID, cfg.PrivateKeys)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"version": AgentVersion,
"address": cfg.ServerAddress,
"tenant_id": cfg.TenantID,
}).Fatal("Failed to create connector")
}

if err := connector.Listen(cmd.Context()); err != nil {
log.WithError(err).WithFields(log.Fields{
"version": AgentVersion,
"address": cfg.ServerAddress,
"tenant_id": cfg.TenantID,
}).Fatal("Failed to listen for connections")
}

log.WithFields(log.Fields{
"version": AgentVersion,
"address": cfg.ServerAddress,
"tenant_id": cfg.TenantID,
}).Info("Connector stopped")
},
})

rootCmd.Version = AgentVersion

rootCmd.SetVersionTemplate(fmt.Sprintf("{{ .Name }} version: {{ .Version }}\ngo: %s\n",
Expand Down
Loading
Loading