From 7a82589e83193d0c9f332455c610501036683838 Mon Sep 17 00:00:00 2001 From: Tautvydas Eidietis Date: Tue, 13 Feb 2024 14:21:41 +0200 Subject: [PATCH] Add AWS IAM auth support with aws_profile --- README.md | 4 ++- kafka/config.go | 45 +++++++++++++++++--------- kafka/config_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++ kafka/provider.go | 7 ++++ 4 files changed, 116 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index fef4636b..61b7b461 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,9 @@ provider "kafka" { | `skip_tls_verify` | Skip TLS verification. | `false` | | `sasl_username` | Username for SASL authentication. | `""` | | `sasl_password` | Password for SASL authentication. | `""` | -| `sasl_mechanism` | Mechanism for SASL authentication. Allowed values are plain, scram-sha512 and scram-sha256 | `plain` | +| `sasl_mechanism` | Mechanism for SASL authentication. Allowed values are plain, scram-sha512, scram-sha256, aws-iam | `plain` | +| `sasl_aws_region` | Only for aws-iam. An AWS region where Kafka cluster is deployed. | `""` | +| `sasl_aws_profile` | Only for aws-iam. An AWS profile to use while authenticating against AWS IAM. Can be left empty - will use default. | `""` | ## Resources ### `kafka_topic` diff --git a/kafka/config.go b/kafka/config.go index 0d58de24..aa34179a 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -28,14 +28,22 @@ type Config struct { SASLPassword string SASLMechanism string SASLAWSRegion string + SASLAWSProfile string } type MSKAccessTokenProvider struct { - region string + region string + profile string } func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) { - token, _, err := signer.GenerateAuthToken(context.TODO(), m.region) + var token string + var err error + if m.profile != "" { + token, _, err = signer.GenerateAuthTokenFromProfile(context.TODO(), m.region, m.profile) + } else { + token, _, err = signer.GenerateAuthToken(context.TODO(), m.region) + } return &sarama.AccessToken{Token: token}, err } @@ -61,13 +69,17 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { case "aws-iam": kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth) region := c.SASLAWSRegion + profile := c.SASLAWSProfile if region == "" { region = os.Getenv("AWS_REGION") } if region == "" { - log.Fatalf("[ERROR] aws region must be configured or AWS_REGION environment variable must be set to use aws-iam sasl mechanism") + return nil, fmt.Errorf("[ERROR] aws region must be configured or AWS_REGION environment variable must be set to use aws-iam sasl mechanism") + } + kafkaConfig.Net.SASL.TokenProvider = &MSKAccessTokenProvider{ + region: region, + profile: profile, } - kafkaConfig.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: region} case "plain": default: log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\" or \"plain\"", c.SASLMechanism) @@ -201,18 +213,19 @@ func newTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*t func (config *Config) copyWithMaskedSensitiveValues() Config { copy := Config{ - config.BootstrapServers, - config.Timeout, - config.CACert, - config.ClientCert, - "*****", - "*****", - config.TLSEnabled, - config.SkipTLSVerify, - config.SASLAWSRegion, - config.SASLUsername, - "*****", - config.SASLMechanism, + BootstrapServers: config.BootstrapServers, + Timeout: config.Timeout, + CACert: config.CACert, + ClientCert: config.ClientCert, + ClientCertKey: "*****", + ClientCertKeyPassphrase: "*****", + TLSEnabled: config.TLSEnabled, + SkipTLSVerify: config.SkipTLSVerify, + SASLUsername: config.SASLUsername, + SASLPassword: "*****", + SASLMechanism: config.SASLMechanism, + SASLAWSRegion: config.SASLAWSRegion, + SASLAWSProfile: config.SASLAWSProfile, } return copy } diff --git a/kafka/config_test.go b/kafka/config_test.go index 944f7582..174a9cdc 100644 --- a/kafka/config_test.go +++ b/kafka/config_test.go @@ -1,8 +1,11 @@ package kafka import ( + "fmt" "os" "testing" + + "github.com/IBM/sarama" ) func loadFile(t *testing.T, file string) string { @@ -117,3 +120,77 @@ func Test_newTLSConfig(t *testing.T) { }) } } + +func Test_newKafkaConfig(t *testing.T) { + type AssertConfig func(config *sarama.Config, hasProfile bool) error + assertFn := func(config *sarama.Config, hasProfile bool) error { + if !config.Net.SASL.Enable { + return fmt.Errorf("SASL is not enabled") + } + if config.Net.SASL.Mechanism != "OAUTHBEARER" { + return fmt.Errorf("SALS mechanism is not 'OAUTHBEARER'") + } + if config.Net.SASL.TokenProvider == nil { + return fmt.Errorf("SALS TokenProvider not set") + } + tokenProvider := config.Net.SASL.TokenProvider.(*MSKAccessTokenProvider) + if tokenProvider.region == "" { + return fmt.Errorf("Token provider region not set") + } + if (tokenProvider.profile == "") == hasProfile { + return fmt.Errorf("Token provider profile not set") + } + return nil + } + + tests := []struct { + name string + config Config + assertFn AssertConfig + wantErr bool + }{ + { + name: "sasl mechanism - aws-iam no region provided", + config: Config{ + SASLMechanism: "aws-iam", + }, + wantErr: true, + }, + { + name: "sasl mechanism - aws-iam with region provided", + config: Config{ + SASLMechanism: "aws-iam", + SASLAWSRegion: "aws-region", + }, + assertFn: assertFn, + wantErr: false, + }, + { + name: "sasl mechanism - aws-iam with region and profile provided", + config: Config{ + SASLMechanism: "aws-iam", + SASLAWSRegion: "aws-region", + SASLAWSProfile: "aws-profile", + }, + assertFn: assertFn, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sc, err := tt.config.newKafkaConfig() + if (err != nil) != tt.wantErr { + t.Errorf("newKafkaConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.assertFn != nil { + assertErr := tt.assertFn(sc, tt.config.SASLAWSProfile != "") + if (assertErr != nil) != tt.wantErr { + t.Errorf("newKafkaConfig() error = %v, wantErr %v", assertErr, tt.wantErr) + return + } + } + }) + } +} diff --git a/kafka/provider.go b/kafka/provider.go index a071d5e4..51d96449 100644 --- a/kafka/provider.go +++ b/kafka/provider.go @@ -67,6 +67,12 @@ func Provider() *schema.Provider { DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_IAM_AWS_REGION", nil), Description: "AWS region where MSK is deployed.", }, + "sasl_aws_profile": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_IAM_AWS_PROFILE", nil), + Description: "AWS profile to use for authentication.", + }, "sasl_username": &schema.Schema{ Type: schema.TypeString, Optional: true, @@ -138,6 +144,7 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) { ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string), SkipTLSVerify: d.Get("skip_tls_verify").(bool), SASLAWSRegion: d.Get("sasl_aws_region").(string), + SASLAWSProfile: d.Get("sasl_aws_profile").(string), SASLUsername: d.Get("sasl_username").(string), SASLPassword: d.Get("sasl_password").(string), SASLMechanism: saslMechanism,