-
-
Notifications
You must be signed in to change notification settings - Fork 140
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(agent,pkg): add connector mode to agent
What is the ShellHub Connector? ShellHub Connector is a new kind of ShellHub Agent that turns [Docker](https://www.docker.com/) containers into ShellHub Devices. It instances a new ShellHub Agent, in memory, for each container running, redirecting the SSH IO, connections in general, and authentication credentials to its file system. > As most of the containers don't have passwords set for its users per default, it rejects the connection every time for users without credentials. What is working now? This initial implementation has support for these kinds of connections: - [x] Shell - [x] Exec - [ ] Heredoc - [ ] SFTP Running Connector To initialize the ShellHub Connector, enter the `agent/` directory, build and run the agent's binary with *connector* sub command. ```sh go build -ldflags "-X main.AgentVersion=latest" -o agent && SERVER_ADDRESS="http://localhost/" PRIVATE_KEYS="/tmp/shellhub/" TENANT_ID="00000000-0000-4000-0000-000000000000" ./agent connector ``` Environmental variables To configure the ShellHub Connector, you can/must provide these environmental variables. - SERVER_ADDRESS (**required**) Set the ShellHub server address the agent will use to connect - PRIVATE_KEYS (**required**) Specify the path to store the devices/containers private keys. If not provided, the agent will generate a new one. - TENANT_ID (**required**) Sets the account tenant ID used during communication to associate the devices to a specific tenant. - KEEPALIVE_INTERVAL Determine the interval to send the keep alive message to the server. This has a direct impact of the bandwidth used by the device when in idle state. Docker notes As this implementation uses the Docker Client, you have the possibility to override the environment variables provided by docker to change some behaviors, but the ShellHub Connector doesn't guarantee its right operation when this is done. Check [Docker Client documentation about this](https://pkg.go.dev/github.com/docker/[email protected]+incompatible/client#FromEnv) for more information
- Loading branch information
1 parent
aa256f6
commit 49429fd
Showing
10 changed files
with
959 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package connector | ||
|
||
import ( | ||
"context" | ||
) | ||
|
||
// Container is a struct that represents a container that will be managed by the connector. | ||
type Container struct { | ||
// ID is the container ID. | ||
ID string | ||
// ServerAddress is the ShellHub address of the server that the agent will connect to. | ||
ServerAddress string | ||
// Tenant is the tenant ID of the namespace that the agent belongs to. | ||
Tenant string | ||
// PrivateKey is the private key of the device. Specify the path to store the container private key. If not | ||
// provided, the agent will generate a new one. This is required. | ||
PrivateKey string | ||
// Cancel is a function that is used to stop the goroutine that is running the agent for this container. | ||
Cancel context.CancelFunc | ||
} | ||
|
||
// Connector is an interface that defines the methods that a connector must implement. | ||
type Connector interface { | ||
// List lists all containers running on the host. | ||
List(ctx context.Context) ([]string, error) | ||
// Start starts the agent for the container with the given ID. | ||
Start(ctx context.Context, id string) | ||
// Stop stops the agent for the container with the given ID. | ||
Stop(ctx context.Context, id string) | ||
// Listen listens for events and starts or stops the agent for the container that was created or removed. | ||
Listen(ctx context.Context) error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
package connector | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/docker/docker/api/types" | ||
"github.com/docker/docker/api/types/events" | ||
dockerclient "github.com/docker/docker/client" | ||
"github.com/shellhub-io/shellhub/pkg/agent" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
var _ Connector = new(DockerConnector) | ||
|
||
// DockerConnector is a struct that represents a connector that uses Docker as the container runtime. | ||
type DockerConnector struct { | ||
mu sync.RWMutex | ||
// server is the ShellHub address of the server that the agent will connect to. | ||
server string | ||
// tenant is the tenant ID of the namespace that the agent belongs to. | ||
tenant string | ||
// cli is the Docker client. | ||
cli *dockerclient.Client | ||
// privateKeys is the path to the directory that contains the private keys for the containers. | ||
privateKeys string | ||
// cancels is a map that contains the cancel functions for each container. | ||
// This is used to stop the agent for a container, marking as done its context and closing the agent. | ||
cancels map[string]context.CancelFunc | ||
} | ||
|
||
// NewDockerConnector creates a new [Connector] that uses Docker as the container runtime. | ||
func NewDockerConnector(server string, tenant string, privateKey string) (Connector, error) { | ||
cli, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &DockerConnector{ | ||
server: server, | ||
tenant: tenant, | ||
cli: cli, | ||
privateKeys: privateKey, | ||
cancels: make(map[string]context.CancelFunc), | ||
}, nil | ||
} | ||
|
||
// events returns the docker events. | ||
func (d *DockerConnector) events(ctx context.Context) (<-chan events.Message, <-chan error) { | ||
return d.cli.Events(ctx, types.EventsOptions{}) | ||
} | ||
|
||
func (d *DockerConnector) List(ctx context.Context) ([]string, error) { | ||
containers, err := d.cli.ContainerList(ctx, types.ContainerListOptions{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
list := make([]string, len(containers)) | ||
for i, container := range containers { | ||
list[i] = container.ID | ||
} | ||
|
||
return list, nil | ||
} | ||
|
||
// Start starts the agent for the container with the given ID. | ||
func (d *DockerConnector) Start(ctx context.Context, id string) { | ||
id = id[:12] | ||
|
||
d.mu.Lock() | ||
ctx, d.cancels[id] = context.WithCancel(ctx) | ||
d.mu.Unlock() | ||
|
||
privateKey := fmt.Sprintf("%s/%s.key", d.privateKeys, id) | ||
go initContainerAgent(ctx, Container{ | ||
ID: id, | ||
ServerAddress: d.server, | ||
Tenant: d.tenant, | ||
PrivateKey: privateKey, | ||
Cancel: d.cancels[id], | ||
}) | ||
} | ||
|
||
// Stop stops the agent for the container with the given ID. | ||
func (d *DockerConnector) Stop(ctx context.Context, id string) { | ||
id = id[:12] | ||
|
||
d.mu.RLock() | ||
cancel, ok := d.cancels[id] | ||
if ok { | ||
cancel() | ||
} | ||
d.mu.RUnlock() | ||
|
||
delete(d.cancels, id) | ||
} | ||
|
||
// Listen listens for events and starts or stops the agent for the containers. | ||
func (d *DockerConnector) Listen(ctx context.Context) error { | ||
containers, err := d.List(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, container := range containers { | ||
d.Start(ctx, container) | ||
} | ||
|
||
events, errs := d.events(ctx) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return nil | ||
case err := <-errs: | ||
return err | ||
case container := <-events: | ||
switch container.Action { | ||
case "start": | ||
d.Start(ctx, container.ID) | ||
case "kill": | ||
d.Stop(ctx, container.ID) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// initContainerAgent initializes the agent for a container. | ||
func initContainerAgent(ctx context.Context, container Container) { | ||
cfg := &agent.Config{ | ||
ServerAddress: container.ServerAddress, | ||
TenantID: container.Tenant, | ||
PrivateKey: container.PrivateKey, | ||
PreferredHostname: container.ID, | ||
PreferredIdentity: container.ID, | ||
Mode: agent.ModeConnector, | ||
KeepAliveInterval: 30, | ||
} | ||
|
||
log.WithFields(log.Fields{ | ||
"id": container.ID, | ||
"identity": cfg.PreferredIdentity, | ||
"hostname": cfg.PreferredHostname, | ||
"tenant_id": cfg.TenantID, | ||
"server_address": cfg.ServerAddress, | ||
"timestamp": time.Now(), | ||
}).Info("Connector container started") | ||
|
||
ag, err := agent.NewAgentWithConfig(cfg) | ||
if err != nil { | ||
log.WithError(err).WithFields(log.Fields{ | ||
"id": container.ID, | ||
"configuration": cfg, | ||
}).Fatal("Failed to create agent") | ||
} | ||
|
||
if err := ag.Initialize(); err != nil { | ||
log.WithError(err).WithFields(log.Fields{ | ||
"id": container.ID, | ||
"configuration": cfg, | ||
}).Fatal("Failed to initialize agent") | ||
} | ||
|
||
listening := make(chan bool) | ||
go func() { | ||
// NOTICE: We only start to ping the server when the agent is ready to accept connections. | ||
// It will make the agent ping to server after the ticker time set on ping function, what is 10 minutes by | ||
// default. | ||
<-listening | ||
|
||
if err := ag.Ping(ctx, nil); err != nil { | ||
log.WithError(err).WithFields(log.Fields{ | ||
"id": container.ID, | ||
"identity": cfg.PreferredIdentity, | ||
"hostname": cfg.PreferredHostname, | ||
"tenant_id": cfg.TenantID, | ||
"server_address": cfg.ServerAddress, | ||
"timestamp": time.Now(), | ||
}).Fatal("Failed to ping server") | ||
} | ||
|
||
log.WithFields(log.Fields{ | ||
"id": container.ID, | ||
"identity": cfg.PreferredIdentity, | ||
"hostname": cfg.PreferredHostname, | ||
"tenant_id": cfg.TenantID, | ||
"server_address": cfg.ServerAddress, | ||
"timestamp": time.Now(), | ||
}).Info("Stopped pinging server") | ||
}() | ||
|
||
log.WithFields(log.Fields{ | ||
"id": container.ID, | ||
"identity": cfg.PreferredIdentity, | ||
"hostname": cfg.PreferredHostname, | ||
"tenant_id": cfg.TenantID, | ||
"server_address": cfg.ServerAddress, | ||
"timestamp": time.Now(), | ||
}).Info("Listening for connections") | ||
|
||
// NOTICE(r): listing for connection and wait for a channel message to close the agent. It will receives | ||
// this mensagem when something out of this goroutine send a `done`, what will cause the agent closes | ||
// and no more connection to be allowed until it be started again. | ||
if err := ag.Listen(ctx, listening); err != nil { | ||
log.WithError(err).WithFields(log.Fields{ | ||
"id": container.ID, | ||
"identity": cfg.PreferredIdentity, | ||
"hostname": cfg.PreferredHostname, | ||
"tenant_id": cfg.TenantID, | ||
"server_address": cfg.ServerAddress, | ||
"timestamp": time.Now(), | ||
}).Fatal("Failed to listen for connections") | ||
} | ||
|
||
log.WithFields(log.Fields{ | ||
"id": container.ID, | ||
"identity": cfg.PreferredIdentity, | ||
"hostname": cfg.PreferredHostname, | ||
"tenant_id": cfg.TenantID, | ||
"server_address": cfg.ServerAddress, | ||
}).Info("Connector container done") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.