From dd9c181961b0fd04893148174ec62ea57eff73d7 Mon Sep 17 00:00:00 2001 From: ffffwh Date: Thu, 15 Sep 2022 13:38:53 +0800 Subject: [PATCH] kafka: add User and Password #968 --- driver/common/taskconfig.go | 2 ++ driver/driver.go | 8 +++++--- driver/kafka/kafka2.go | 2 ++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/driver/common/taskconfig.go b/driver/common/taskconfig.go index 3ac4fe956..bdff140ac 100644 --- a/driver/common/taskconfig.go +++ b/driver/common/taskconfig.go @@ -143,6 +143,8 @@ type KafkaConfig struct { Topic string Converter string TimeZone string + User string + Password string MessageGroupMaxSize uint64 MessageGroupTimeout uint64 diff --git a/driver/driver.go b/driver/driver.go index 0277a17ea..8e67dd497 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -82,7 +82,7 @@ var ( hclspec.NewLiteral(`""`)), "memory": hclspec.NewAttr("memory", "string", false), "big_tx_max_jobs": hclspec.NewAttr("big_tx_max_jobs", "number", false), - "log_file": hclspec.NewDefault(hclspec.NewAttr("log_file", "string", false), + "log_file": hclspec.NewDefault(hclspec.NewAttr("log_file", "string", false), hclspec.NewLiteral(`"/var/log/dtle"`)), }) @@ -164,6 +164,8 @@ var ( "TopicWithSchemaTable": hclspec.NewDefault(hclspec.NewAttr("TopicWithSchemaTable", "bool", false), hclspec.NewLiteral(`true`)), "SchemaChangeTopic": hclspec.NewAttr("SchemaChangeTopic", "string", false), + "User": hclspec.NewAttr("User", "string", false), + "Password": hclspec.NewAttr("SchemaChangeTopic", "string", false), })), // Since each job has its own history, this should be smaller than MySQL default (25000). "DependencyHistorySize": hclspec.NewDefault(hclspec.NewAttr("DependencyHistorySize", "number", false), @@ -231,8 +233,8 @@ type Driver struct { // logger will log to the Nomad agent logger g.LoggerType - stand *stand.StanServer - apiServer *httprouter.Router + stand *stand.StanServer + apiServer *httprouter.Router setupApiServerFn func(logger g.LoggerType, driverConfig *DriverConfig) error config *DriverConfig diff --git a/driver/kafka/kafka2.go b/driver/kafka/kafka2.go index daac8c073..9da8636e2 100644 --- a/driver/kafka/kafka2.go +++ b/driver/kafka/kafka2.go @@ -73,6 +73,8 @@ func NewKafkaManager(kcfg *common.KafkaConfig) (*KafkaManager, error) { } config := sarama.NewConfig() config.Producer.Return.Successes = true + config.Net.SASL.User = kcfg.User + config.Net.SASL.Password = kcfg.Password k.producer, err = sarama.NewSyncProducer(kcfg.Brokers, config) if err != nil {