Skip to content

Commit

Permalink
Add AWS IAM auth support with aws_profile
Browse files Browse the repository at this point in the history
  • Loading branch information
Spragalas committed Feb 13, 2024
1 parent 83347c2 commit 7a82589
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 17 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ provider "kafka" {
| `skip_tls_verify` | Skip TLS verification. | `false` |
| `sasl_username` | Username for SASL authentication. | `""` |
| `sasl_password` | Password for SASL authentication. | `""` |
| `sasl_mechanism` | Mechanism for SASL authentication. Allowed values are plain, scram-sha512 and scram-sha256 | `plain` |
| `sasl_mechanism` | Mechanism for SASL authentication. Allowed values are plain, scram-sha512, scram-sha256, aws-iam | `plain` |
| `sasl_aws_region` | Only for aws-iam. An AWS region where Kafka cluster is deployed. | `""` |
| `sasl_aws_profile` | Only for aws-iam. An AWS profile to use while authenticating against AWS IAM. Can be left empty - will use default. | `""` |
## Resources
### `kafka_topic`
Expand Down
45 changes: 29 additions & 16 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,22 @@ type Config struct {
SASLPassword string
SASLMechanism string
SASLAWSRegion string
SASLAWSProfile string
}

type MSKAccessTokenProvider struct {
region string
region string
profile string
}

func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) {
token, _, err := signer.GenerateAuthToken(context.TODO(), m.region)
var token string
var err error
if m.profile != "" {
token, _, err = signer.GenerateAuthTokenFromProfile(context.TODO(), m.region, m.profile)
} else {
token, _, err = signer.GenerateAuthToken(context.TODO(), m.region)
}
return &sarama.AccessToken{Token: token}, err
}

Expand All @@ -61,13 +69,17 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) {
case "aws-iam":
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth)
region := c.SASLAWSRegion
profile := c.SASLAWSProfile
if region == "" {
region = os.Getenv("AWS_REGION")
}
if region == "" {
log.Fatalf("[ERROR] aws region must be configured or AWS_REGION environment variable must be set to use aws-iam sasl mechanism")
return nil, fmt.Errorf("[ERROR] aws region must be configured or AWS_REGION environment variable must be set to use aws-iam sasl mechanism")
}
kafkaConfig.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
region: region,
profile: profile,
}
kafkaConfig.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: region}
case "plain":
default:
log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\", \"aws-iam\" or \"plain\"", c.SASLMechanism)
Expand Down Expand Up @@ -201,18 +213,19 @@ func newTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*t

func (config *Config) copyWithMaskedSensitiveValues() Config {
copy := Config{
config.BootstrapServers,
config.Timeout,
config.CACert,
config.ClientCert,
"*****",
"*****",
config.TLSEnabled,
config.SkipTLSVerify,
config.SASLAWSRegion,
config.SASLUsername,
"*****",
config.SASLMechanism,
BootstrapServers: config.BootstrapServers,
Timeout: config.Timeout,
CACert: config.CACert,
ClientCert: config.ClientCert,
ClientCertKey: "*****",
ClientCertKeyPassphrase: "*****",
TLSEnabled: config.TLSEnabled,
SkipTLSVerify: config.SkipTLSVerify,
SASLUsername: config.SASLUsername,
SASLPassword: "*****",
SASLMechanism: config.SASLMechanism,
SASLAWSRegion: config.SASLAWSRegion,
SASLAWSProfile: config.SASLAWSProfile,
}
return copy
}
77 changes: 77 additions & 0 deletions kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package kafka

import (
"fmt"
"os"
"testing"

"github.com/IBM/sarama"
)

func loadFile(t *testing.T, file string) string {
Expand Down Expand Up @@ -117,3 +120,77 @@ func Test_newTLSConfig(t *testing.T) {
})
}
}

func Test_newKafkaConfig(t *testing.T) {
type AssertConfig func(config *sarama.Config, hasProfile bool) error
assertFn := func(config *sarama.Config, hasProfile bool) error {
if !config.Net.SASL.Enable {
return fmt.Errorf("SASL is not enabled")
}
if config.Net.SASL.Mechanism != "OAUTHBEARER" {
return fmt.Errorf("SALS mechanism is not 'OAUTHBEARER'")
}
if config.Net.SASL.TokenProvider == nil {
return fmt.Errorf("SALS TokenProvider not set")
}
tokenProvider := config.Net.SASL.TokenProvider.(*MSKAccessTokenProvider)
if tokenProvider.region == "" {
return fmt.Errorf("Token provider region not set")
}
if (tokenProvider.profile == "") == hasProfile {
return fmt.Errorf("Token provider profile not set")
}
return nil
}

tests := []struct {
name string
config Config
assertFn AssertConfig
wantErr bool
}{
{
name: "sasl mechanism - aws-iam no region provided",
config: Config{
SASLMechanism: "aws-iam",
},
wantErr: true,
},
{
name: "sasl mechanism - aws-iam with region provided",
config: Config{
SASLMechanism: "aws-iam",
SASLAWSRegion: "aws-region",
},
assertFn: assertFn,
wantErr: false,
},
{
name: "sasl mechanism - aws-iam with region and profile provided",
config: Config{
SASLMechanism: "aws-iam",
SASLAWSRegion: "aws-region",
SASLAWSProfile: "aws-profile",
},
assertFn: assertFn,
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc, err := tt.config.newKafkaConfig()
if (err != nil) != tt.wantErr {
t.Errorf("newKafkaConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.assertFn != nil {
assertErr := tt.assertFn(sc, tt.config.SASLAWSProfile != "")
if (assertErr != nil) != tt.wantErr {
t.Errorf("newKafkaConfig() error = %v, wantErr %v", assertErr, tt.wantErr)
return
}
}
})
}
}
7 changes: 7 additions & 0 deletions kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func Provider() *schema.Provider {
DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_IAM_AWS_REGION", nil),
Description: "AWS region where MSK is deployed.",
},
"sasl_aws_profile": &schema.Schema{
Type: schema.TypeString,
Optional: true,
DefaultFunc: schema.EnvDefaultFunc("KAFKA_SASL_IAM_AWS_PROFILE", nil),
Description: "AWS profile to use for authentication.",
},
"sasl_username": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -138,6 +144,7 @@ func providerConfigure(d *schema.ResourceData) (interface{}, error) {
ClientCertKeyPassphrase: d.Get("client_key_passphrase").(string),
SkipTLSVerify: d.Get("skip_tls_verify").(bool),
SASLAWSRegion: d.Get("sasl_aws_region").(string),
SASLAWSProfile: d.Get("sasl_aws_profile").(string),
SASLUsername: d.Get("sasl_username").(string),
SASLPassword: d.Get("sasl_password").(string),
SASLMechanism: saslMechanism,
Expand Down

0 comments on commit 7a82589

Please sign in to comment.