diff --git a/pkg/output/kafka/client.go b/pkg/output/kafka/client.go index 83197888..04639a05 100644 --- a/pkg/output/kafka/client.go +++ b/pkg/output/kafka/client.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "os" "strings" "github.com/IBM/sarama" @@ -34,6 +35,16 @@ var ( PartitionStrategyRandom PartitionStrategy = "random" ) +type SASLMechanism string + +var ( + SASLTypeOAuth SASLMechanism = "OAUTHBEARER" + SASLTypePlaintext SASLMechanism = "PLAIN" + SASLTypeSCRAMSHA256 SASLMechanism = "SCRAM-SHA-256" + SASLTypeSCRAMSHA512 SASLMechanism = "SCRAM-SHA-512" + SASLTypeGSSAPI SASLMechanism = "GSSAPI" +) + func NewSyncProducer(config *Config) (sarama.SyncProducer, error) { producerConfig, err := Init(config) if err != nil { @@ -50,8 +61,10 @@ func Init(config *Config) (*sarama.Config, error) { c.Producer.Flush.Frequency = config.FlushFrequency c.Producer.Retry.Max = config.MaxRetries c.Producer.Return.Successes = true + c.Net.TLS.Enable = config.TLS + c.Metadata.Full = false - if config.TLSClientConfig.CertificatePath != "" { + if config.TLSClientConfig != nil { clientCertificate, err := tls.LoadX509KeyPair(config.TLSClientConfig.CertificatePath, config.TLSClientConfig.KeyPath) if err != nil { return nil, fmt.Errorf("failed to read client certificate: %w", err) @@ -70,6 +83,38 @@ func Init(config *Config) (*sarama.Config, error) { c.Net.TLS.Config = tlsConfig } + if config.SASLConfig != nil { + var password string + if config.SASLConfig.Password != "" { + password = config.SASLConfig.Password + } else if config.SASLConfig.PasswordFile != "" { + passwordFile, err := os.ReadFile(config.SASLConfig.PasswordFile) + if err != nil { + return nil, fmt.Errorf("failed to read client password: %w", err) + } + + password = strings.TrimSpace(string(passwordFile)) + } + + c.Net.SASL.Enable = true + c.Net.SASL.Version = config.SASLConfig.Version + c.Net.SASL.User = config.SASLConfig.User + c.Net.SASL.Password = password + + switch config.SASLConfig.Mechanism { + case SASLTypeOAuth: + c.Net.SASL.Mechanism = sarama.SASLTypeOAuth + case SASLTypeSCRAMSHA256: + c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + case SASLTypeSCRAMSHA512: + c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + case SASLTypeGSSAPI: + c.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + default: + c.Net.SASL.Mechanism = sarama.SASLTypePlaintext + } + } + switch config.RequiredAcks { case RequiredAcksNone: c.Producer.RequiredAcks = sarama.NoResponse diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index e3c5da02..11cf7eb0 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -9,7 +9,8 @@ type Config struct { Brokers string `yaml:"brokers"` Topic string `yaml:"topic"` TLS bool `yaml:"tls" default:"false"` - TLSClientConfig TLSClientConfig `yaml:"tlsClientConfig"` + TLSClientConfig *TLSClientConfig `yaml:"tlsClientConfig"` + SASLConfig *SASLConfig `yaml:"sasl"` MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"` FlushMessages int `yaml:"flushMessages" default:"500"` @@ -26,6 +27,14 @@ type TLSClientConfig struct { CACertificate string `yaml:"caCertificate"` } +type SASLConfig struct { + Mechanism SASLMechanism `yaml:"mechanism" default:"PLAIN"` + Version int16 `yaml:"version" default:"1"` + User string `yaml:"user"` + Password string `yaml:"password"` + PasswordFile string `yaml:"passwordFile"` +} + func (c *Config) Validate() error { if c.Brokers == "" { return errors.New("brokers is required") @@ -35,17 +44,49 @@ func (c *Config) Validate() error { return errors.New("topic is required") } + if c.TLSClientConfig != nil && c.SASLConfig != nil { + return errors.New("only one of 'tlsClientConfig' and 'sasl' can be specified") + } + if err := c.TLSClientConfig.Validate(); err != nil { return err } + if err := c.SASLConfig.Validate(); err != nil { + return err + } + return nil } func (c *TLSClientConfig) Validate() error { + if c == nil { + return nil + } + if c.CertificatePath != "" && c.KeyPath == "" { return errors.New("client key is required") } return nil } + +func (c *SASLConfig) Validate() error { + if c == nil { + return nil + } + + if c.User == "" { + return errors.New("'user' is required") + } + + if c.Password != "" && c.PasswordFile != "" { + return errors.New("either 'password' or 'passwordFile' can be specified") + } + + if c.Password == "" && c.PasswordFile == "" { + return errors.New("'password' or 'passwordFile' is required") + } + + return nil +}