Skip to content

Commit

Permalink
Support Running Orchestrator behind a Reverse Proxy (#4724)
Browse files Browse the repository at this point in the history
If we need to put an Orchestrator behind a TLS terminating reverse
proxy,the NATS server should be configurred in a very specific way, and
the NATS clients (compute nodes) should also be configured in a certain
way.

The NATS server should say the TLS is available, although it is not.
Also, the compute node should enforce TLS communication for NATS,
because reverse proxy supports TLS.

See link:
https://docs.nats.io/running-a-nats-service/configuration/securing_nats/tls#tls-terminating-reverse-proxies

Sample Orchestrator Node config:
```yaml
NameProvider: "uuid"
API:
  Port: 1234
Orchestrator:
  Enabled: true
  Auth:
    Token: "i_am_very_secret_token"
  SupportReverseProxy: true
```

Sample Compute Node Config:
```yaml
NameProvider: "uuid"
API:
  Port: 1234
Compute:
  Enabled: true
  Orchestrators:
    - nats://bacalhau-traefik-node:4222 
  Auth: Token: "i_am_very_secret_token" 
  TLS: 
    RequireTLS: true 
```

Please see the integration tests in this commit, it has a very detailed
test suite covering all cases.

Linear:
https://linear.app/expanso/issue/ENG-379/bacalhau-to-support-tls-behind-reverse-proxy

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced support for TLS communication and reverse proxy
configurations in compute and orchestrator nodes.
- Added new properties in the API schema to enhance configuration
options.

- **Bug Fixes**
- Improved error handling for NATS connections based on TLS
requirements.

- **Documentation**
- Updated Swagger API documentation to include new properties and
configurations.

- **Tests**
- Added a new test suite to validate orchestrator functionality behind a
reverse proxy.

- **Chores**
- Introduced new Docker Compose configurations for enhanced service
orchestration.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
jamlo authored Nov 29, 2024
1 parent 5e3ab41 commit dca8719
Show file tree
Hide file tree
Showing 21 changed files with 488 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,5 @@ buildvcs
Nilf
IMDS
tlsca
Lenf
Lenf
traefik
3 changes: 3 additions & 0 deletions pkg/config/types/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type ComputeAuth struct {
type ComputeTLS struct {
// CACert specifies the CA file path that the compute node trusts when connecting to orchestrator.
CACert string `yaml:"CACert,omitempty" json:"CACert,omitempty"`

// RequireTLS specifies if the compute node enforces encrypted communication with orchestrator.
RequireTLS bool `yaml:"RequireTLS,omitempty" json:"RequireTLS,omitempty"`
}

type Heartbeat struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/generated_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const ComputeHeartbeatIntervalKey = "Compute.Heartbeat.Interval"
const ComputeHeartbeatResourceUpdateIntervalKey = "Compute.Heartbeat.ResourceUpdateInterval"
const ComputeOrchestratorsKey = "Compute.Orchestrators"
const ComputeTLSCACertKey = "Compute.TLS.CACert"
const ComputeTLSRequireTLSKey = "Compute.TLS.RequireTLS"
const DataDirKey = "DataDir"
const DisableAnalyticsKey = "DisableAnalytics"
const EnginesDisabledKey = "Engines.Disabled"
Expand Down Expand Up @@ -92,6 +93,7 @@ const OrchestratorSchedulerHousekeepingIntervalKey = "Orchestrator.Scheduler.Hou
const OrchestratorSchedulerHousekeepingTimeoutKey = "Orchestrator.Scheduler.HousekeepingTimeout"
const OrchestratorSchedulerQueueBackoffKey = "Orchestrator.Scheduler.QueueBackoff"
const OrchestratorSchedulerWorkerCountKey = "Orchestrator.Scheduler.WorkerCount"
const OrchestratorSupportReverseProxyKey = "Orchestrator.SupportReverseProxy"
const OrchestratorTLSCACertKey = "Orchestrator.TLS.CACert"
const OrchestratorTLSServerCertKey = "Orchestrator.TLS.ServerCert"
const OrchestratorTLSServerKeyKey = "Orchestrator.TLS.ServerKey"
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/generated_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var ConfigDescriptions = map[string]string{
ComputeHeartbeatResourceUpdateIntervalKey: "ResourceUpdateInterval specifies the time between updates of resource information to the orchestrator.",
ComputeOrchestratorsKey: "Orchestrators specifies a list of orchestrator endpoints that this compute node connects to.",
ComputeTLSCACertKey: "CACert specifies the CA file path that the compute node trusts when connecting to orchestrator.",
ComputeTLSRequireTLSKey: "RequireTLS specifies if the compute node enforces encrypted communication with orchestrator.",
DataDirKey: "DataDir specifies a location on disk where the bacalhau node will maintain state.",
DisableAnalyticsKey: "DisableAnalytics, when true, disables sharing anonymous analytics data with the Bacalhau development team",
EnginesDisabledKey: "Disabled specifies a list of engines that are disabled.",
Expand Down Expand Up @@ -94,6 +95,7 @@ var ConfigDescriptions = map[string]string{
OrchestratorSchedulerHousekeepingTimeoutKey: "HousekeepingTimeout specifies the maximum time allowed for a single housekeeping run.",
OrchestratorSchedulerQueueBackoffKey: "QueueBackoff specifies the time to wait before retrying a failed job.",
OrchestratorSchedulerWorkerCountKey: "WorkerCount specifies the number of concurrent workers for job scheduling.",
OrchestratorSupportReverseProxyKey: "SupportReverseProxy configures the orchestrator node to run behind a reverse proxy",
OrchestratorTLSCACertKey: "CACert specifies the CA file path that the orchestrator node trusts when connecting to NATS server.",
OrchestratorTLSServerCertKey: "ServerCert specifies the certificate file path given to NATS server to serve TLS connections.",
OrchestratorTLSServerKeyKey: "ServerKey specifies the private key file path given to NATS server to serve TLS connections.",
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Orchestrator struct {
NodeManager NodeManager `yaml:"NodeManager,omitempty" json:"NodeManager,omitempty"`
Scheduler Scheduler `yaml:"Scheduler,omitempty" json:"Scheduler,omitempty"`
EvaluationBroker EvaluationBroker `yaml:"EvaluationBroker,omitempty" json:"EvaluationBroker,omitempty"`
// SupportReverseProxy configures the orchestrator node to run behind a reverse proxy
SupportReverseProxy bool `yaml:"SupportReverseProxy,omitempty" json:"SupportReverseProxy,omitempty"`
}

type OrchestratorAuth struct {
Expand Down
56 changes: 50 additions & 6 deletions pkg/nats/transport/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
"fmt"
"strings"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/compute"
"github.com/bacalhau-project/bacalhau/pkg/compute/logstream"
Expand All @@ -20,6 +16,9 @@ import (
nats_pubsub "github.com/bacalhau-project/bacalhau/pkg/nats/pubsub"
"github.com/bacalhau-project/bacalhau/pkg/pubsub"
"github.com/bacalhau-project/bacalhau/pkg/routing"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)

const NodeInfoSubjectPrefix = "node.info."
Expand Down Expand Up @@ -60,8 +59,15 @@ type NATSTransportConfig struct {
// Used by the Nats Client when node acts as orchestrator
ServerTLSCACert string

// Use by the Nats Client when node acts as compute
// Used by the Nats Client when node acts as compute
ClientTLSCACert string

// Used to configure Orchestrator (actually the NATS server) to run behind
// a reverse proxy
ServerSupportReverseProxy bool

// Used to configure compute node nats client to require TLS connection
ComputeClientRequireTLS bool
}

func (c *NATSTransportConfig) Validate() error {
Expand Down Expand Up @@ -168,6 +174,21 @@ func NewNATSTransport(ctx context.Context,
serverOpts.TLSConfig = serverTLSConfig
}

if config.ServerSupportReverseProxy {
// If the ServerSupportReverseProxy is enabled, we need to set
// serverOpts.TLSConfig to an empty config, if it is null.
// Reason for that , unfortunately that's the only eay NATS server will
// work behind a reverse proxy, that's how NATS documentation recommends doing it.
// See: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/tls#tls-terminating-reverse-proxies
serverOpts.AllowNonTLS = true

// We need to make sure not to override TLS configuration if it was set. Maybe the operator want TLS
// between reverse proxy and NATS server, up to them.
if serverOpts.TLSConfig == nil {
serverOpts.TLSConfig, _ = server.GenTLSConfig(&server.TLSConfigOpts{})
}
}

// Only set cluster options if cluster peers are provided. Jetstream doesn't
// like the setting to be present with no values, or with values that are
// a local address (e.g. it can't RAFT to itself).
Expand All @@ -194,7 +215,25 @@ func NewNATSTransport(ctx context.Context,
return nil, err
}

config.Orchestrators = append(config.Orchestrators, sm.Server.ClientURL())
if config.ServerSupportReverseProxy {
// Server.ClientURL() (in core NATS code), will check if TLSConfig of the server
// is not null, and changes the URL Scheme from "nats" to "tls". When running
// the server with ServerSupportReverseProxy setting, almost all the time
// the NATS server will not be supporting TLS. This will make the orchestrator NATS client
// fail, since it was given the "tls://" NATS server URL to connect to, but the
// server does not support TLS. It is unfortunate that the ClientURL method does that.
// So here, we are checking, if NATS server was not started with a cert and key, and at the
// same time it was started with ServerSupportReverseProxy set to true, then we will change
// URL the scheme back to "nats://" from "tls://".

clientURL := sm.Server.ClientURL()
if strings.HasPrefix(clientURL, "tls://") && config.ServerTLSCert == "" {
clientURL = strings.Replace(clientURL, "tls://", "nats://", 1)
}
config.Orchestrators = append(config.Orchestrators, clientURL)
} else {
config.Orchestrators = append(config.Orchestrators, sm.Server.ClientURL())
}
}

nc, err := CreateClient(ctx, config)
Expand Down Expand Up @@ -273,6 +312,11 @@ func CreateClient(ctx context.Context, config *NATSTransportConfig) (*nats_helpe
nats.MaxReconnects(-1),
}

// When Compute Node requires TLS, enforce it
if config.ComputeClientRequireTLS {
clientOptions = append(clientOptions, nats.TLSHandshakeFirst())
}

// We need to do this logic since the Nats Transport Layer does not differentiate
// between orchestrator mode and compute mode
if config.ServerTLSCert == "" && config.ClientTLSCACert != "" {
Expand Down
36 changes: 19 additions & 17 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,23 +349,25 @@ func createTransport(ctx context.Context, cfg NodeConfig) (*nats_transport.NATST

// TODO: revisit how we setup the transport layer for compute only, orchestrator only and hybrid nodes
config := &nats_transport.NATSTransportConfig{
NodeID: cfg.NodeID,
Host: cfg.BacalhauConfig.Orchestrator.Host,
Port: cfg.BacalhauConfig.Orchestrator.Port,
AdvertisedAddress: cfg.BacalhauConfig.Orchestrator.Advertise,
AuthSecret: cfg.BacalhauConfig.Orchestrator.Auth.Token,
Orchestrators: cfg.BacalhauConfig.Compute.Orchestrators,
StoreDir: storeDir,
ClusterName: cfg.BacalhauConfig.Orchestrator.Cluster.Name,
ClusterPort: cfg.BacalhauConfig.Orchestrator.Cluster.Port,
ClusterPeers: cfg.BacalhauConfig.Orchestrator.Cluster.Peers,
ClusterAdvertisedAddress: cfg.BacalhauConfig.Orchestrator.Cluster.Advertise,
IsRequesterNode: cfg.BacalhauConfig.Orchestrator.Enabled,
ServerTLSCACert: cfg.BacalhauConfig.Orchestrator.TLS.CACert,
ServerTLSCert: cfg.BacalhauConfig.Orchestrator.TLS.ServerCert,
ServerTLSKey: cfg.BacalhauConfig.Orchestrator.TLS.ServerKey,
ServerTLSTimeout: cfg.BacalhauConfig.Orchestrator.TLS.ServerTimeout,
ClientTLSCACert: cfg.BacalhauConfig.Compute.TLS.CACert,
NodeID: cfg.NodeID,
Host: cfg.BacalhauConfig.Orchestrator.Host,
Port: cfg.BacalhauConfig.Orchestrator.Port,
AdvertisedAddress: cfg.BacalhauConfig.Orchestrator.Advertise,
AuthSecret: cfg.BacalhauConfig.Orchestrator.Auth.Token,
Orchestrators: cfg.BacalhauConfig.Compute.Orchestrators,
StoreDir: storeDir,
ClusterName: cfg.BacalhauConfig.Orchestrator.Cluster.Name,
ClusterPort: cfg.BacalhauConfig.Orchestrator.Cluster.Port,
ClusterPeers: cfg.BacalhauConfig.Orchestrator.Cluster.Peers,
ClusterAdvertisedAddress: cfg.BacalhauConfig.Orchestrator.Cluster.Advertise,
IsRequesterNode: cfg.BacalhauConfig.Orchestrator.Enabled,
ServerTLSCACert: cfg.BacalhauConfig.Orchestrator.TLS.CACert,
ServerTLSCert: cfg.BacalhauConfig.Orchestrator.TLS.ServerCert,
ServerTLSKey: cfg.BacalhauConfig.Orchestrator.TLS.ServerKey,
ServerTLSTimeout: cfg.BacalhauConfig.Orchestrator.TLS.ServerTimeout,
ServerSupportReverseProxy: cfg.BacalhauConfig.Orchestrator.SupportReverseProxy,
ClientTLSCACert: cfg.BacalhauConfig.Compute.TLS.CACert,
ComputeClientRequireTLS: cfg.BacalhauConfig.Compute.TLS.RequireTLS,
}

if cfg.BacalhauConfig.Compute.Enabled && !cfg.BacalhauConfig.Orchestrator.Enabled {
Expand Down
8 changes: 8 additions & 0 deletions pkg/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2400,6 +2400,10 @@ const docTemplate = `{
"CACert": {
"description": "CACert specifies the CA file path that the compute node trusts when connecting to orchestrator.",
"type": "string"
},
"RequireTLS": {
"description": "RequireTLS specifies if the compute node enforces encrypted communication with orchestrator.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -2714,6 +2718,10 @@ const docTemplate = `{
"Scheduler": {
"$ref": "#/definitions/types.Scheduler"
},
"SupportReverseProxy": {
"description": "SupportReverseProxy configures the orchestrator node to run behind a reverse proxy",
"type": "boolean"
},
"TLS": {
"description": "TLS specifies the TLS related configuration on the orchestrator for when compute nodes need to connect.",
"allOf": [
Expand Down
8 changes: 8 additions & 0 deletions pkg/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2396,6 +2396,10 @@
"CACert": {
"description": "CACert specifies the CA file path that the compute node trusts when connecting to orchestrator.",
"type": "string"
},
"RequireTLS": {
"description": "RequireTLS specifies if the compute node enforces encrypted communication with orchestrator.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -2710,6 +2714,10 @@
"Scheduler": {
"$ref": "#/definitions/types.Scheduler"
},
"SupportReverseProxy": {
"description": "SupportReverseProxy configures the orchestrator node to run behind a reverse proxy",
"type": "boolean"
},
"TLS": {
"description": "TLS specifies the TLS related configuration on the orchestrator for when compute nodes need to connect.",
"allOf": [
Expand Down
124 changes: 124 additions & 0 deletions test_integration/9_orchestrator_behind_reverse_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package test_integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

"bacalhau/integration_tests/utils"

"github.com/google/uuid"
"github.com/stretchr/testify/suite"
)

type OrchestratorBehindReverseProxySuite struct {
BaseDockerComposeTestSuite
}

func NewOrchestratorBehindReverseProxySuite() *OrchestratorBehindReverseProxySuite {
s := &OrchestratorBehindReverseProxySuite{}
s.GlobalRunIdentifier = globalTestExecutionId
s.SuiteRunIdentifier = strings.ToLower(strings.Split(uuid.New().String(), "-")[0])
return s
}

func (s *OrchestratorBehindReverseProxySuite) SetupSuite() {
// In this test suite, the orchestrator is running behind a reverse proxy, and all
// the NATS traffic between orchestrator and compute Node go through a real reverse proxy (Traefik)

rawDockerComposeFilePath := "./common_assets/docker_compose_files/orchestrator-compute-traefik-custom-startup.yml"
s.Context, s.Cancel = context.WithCancel(context.Background())

traefikConfigFile := s.commonAssets("nodes_configs/9_traefik_static_config.yaml")
traefikStartCommand := fmt.Sprintf("--configFile=%s", traefikConfigFile)

orchestratorConfigFile := s.commonAssets("nodes_configs/9_orchestrator_node_behind_reverse_proxy.yaml")
orchestratorStartCommand := fmt.Sprintf("bacalhau serve --config=%s", orchestratorConfigFile)

computeConfigFile := s.commonAssets("nodes_configs/9_compute_node_with_enforced_tls_nats.yaml")
computeStartCommand := fmt.Sprintf("bacalhau serve --config=%s", computeConfigFile)
extraRenderingData := map[string]interface{}{
"OrchestratorStartCommand": orchestratorStartCommand,
"ComputeStartCommand": computeStartCommand,
"TraefikStartCommand": traefikStartCommand,
}
s.BaseDockerComposeTestSuite.SetupSuite(rawDockerComposeFilePath, extraRenderingData)
}

func (s *OrchestratorBehindReverseProxySuite) TearDownSuite() {
s.T().Log("Tearing down [Test Suite] in OrchestratorBehindReverseProxySuite...")
s.BaseDockerComposeTestSuite.TearDownSuite()
}

func (s *OrchestratorBehindReverseProxySuite) TestRunHelloWorldJobWithOrchestratorBehindReverseProxy() {
result, err := s.executeCommandInDefaultJumpbox(
[]string{
"bacalhau",
"job",
"run",
"--wait=false",
"--id-only",
"/bacalhau_integration_tests/common_assets/job_specs/hello_world.yml",
})
s.Require().NoError(err)

jobID, err := utils.ExtractJobIDFromShortOutput(result)
s.Require().NoError(err)

_, err = s.waitForJobToComplete(jobID, 30*time.Second)
s.Require().NoError(err)

resultDescription, err := s.executeCommandInDefaultJumpbox([]string{"bacalhau", "job", "describe", jobID})
s.Require().NoError(err)
s.Require().Contains(resultDescription, "hello bacalhau world", resultDescription)
}

func (s *OrchestratorBehindReverseProxySuite) TestNatsConnectionWillFailWithoutRequireTLS() {
_, err := s.executeCommandInDefaultJumpbox(
[]string{
"nats",
"--server=nats://i_am_very_secret_token@bacalhau-traefik-node:4222",
"--no-tlsfirst",
"pub",
"node.info",
"helloworld",
})
s.Require().Error(err)
s.Require().ErrorContains(err, "error: read tcp")
s.Require().ErrorContains(err, "timeout")
}

func (s *OrchestratorBehindReverseProxySuite) TestNatsTLSConnectionWillFailWithoutGoingThroughReverseProxy() {
_, err := s.executeCommandInDefaultJumpbox(
[]string{
"nats",
"--server=nats://i_am_very_secret_token@bacalhau-orchestrator-node:4222",
"--tlsca=/bacalhau_integration_tests/common_assets/certificates/nats_custom/nats_root_ca.crt",
"--tlsfirst",
"pub",
"node.info",
"helloworld",
})
s.Require().Error(err)
s.Require().ErrorContains(err, "error: tls: first record does not look like a TLS handshake")
}

func (s *OrchestratorBehindReverseProxySuite) TestNatsConnectionWillSucceedWithRequireTLS() {
result, err := s.executeCommandInDefaultJumpbox(
[]string{
"nats",
"--server=nats://i_am_very_secret_token@bacalhau-traefik-node:4222",
"--tlsfirst",
"pub",
"node.info",
"helloworld",
})
s.Require().NoError(err)
s.Require().Contains(result, `Published 10 bytes to "node.info"`)
}

func TestOrchestratorBehindReverseProxySuite(t *testing.T) {
suite.Run(t, NewOrchestratorBehindReverseProxySuite())
}
1 change: 1 addition & 0 deletions test_integration/base_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *BaseDockerComposeTestSuite) SetupSuite(dockerComposeFilePath string, re
"OrchestratorImageName": fmt.Sprintf("bacalhau-test-orchestrator-%s:%s", s.GlobalRunIdentifier, s.GlobalRunIdentifier),
"ComputeImageName": fmt.Sprintf("bacalhau-test-compute-%s:%s", s.GlobalRunIdentifier, s.GlobalRunIdentifier),
"JumpboxImageName": fmt.Sprintf("bacalhau-test-jumpbox-%s:%s", s.GlobalRunIdentifier, s.GlobalRunIdentifier),
"TraefikImageName": fmt.Sprintf("bacalhau-test-traefik-%s:%s", s.GlobalRunIdentifier, s.GlobalRunIdentifier),
}

// Merge Rendering Data
Expand Down
Loading

0 comments on commit dca8719

Please sign in to comment.