Skip to content

Commit

Permalink
kafka: add User and Password #968
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Dec 12, 2022
1 parent ab25514 commit 768c046
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
2 changes: 2 additions & 0 deletions driver/common/taskconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ type KafkaConfig struct {
Topic string
Converter string
TimeZone string
User string
Password string
MessageGroupMaxSize uint64
MessageGroupTimeout uint64

Expand Down
8 changes: 5 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var (
hclspec.NewLiteral(`""`)),
"memory": hclspec.NewAttr("memory", "string", false),
"big_tx_max_jobs": hclspec.NewAttr("big_tx_max_jobs", "number", false),
"log_file": hclspec.NewDefault(hclspec.NewAttr("log_file", "string", false),
"log_file": hclspec.NewDefault(hclspec.NewAttr("log_file", "string", false),
hclspec.NewLiteral(`"/var/log/dtle"`)),
})

Expand Down Expand Up @@ -164,6 +164,8 @@ var (
"TopicWithSchemaTable": hclspec.NewDefault(hclspec.NewAttr("TopicWithSchemaTable", "bool", false),
hclspec.NewLiteral(`true`)),
"SchemaChangeTopic": hclspec.NewAttr("SchemaChangeTopic", "string", false),
"User": hclspec.NewAttr("User", "string", false),
"Password": hclspec.NewAttr("SchemaChangeTopic", "string", false),
})),
// Since each job has its own history, this should be smaller than MySQL default (25000).
"DependencyHistorySize": hclspec.NewDefault(hclspec.NewAttr("DependencyHistorySize", "number", false),
Expand Down Expand Up @@ -229,8 +231,8 @@ type Driver struct {
// logger will log to the Nomad agent
logger g.LoggerType

stand *stand.StanServer
apiServer *httprouter.Router
stand *stand.StanServer
apiServer *httprouter.Router
setupApiServerFn func(logger g.LoggerType, driverConfig *DriverConfig) error

config *DriverConfig
Expand Down
2 changes: 2 additions & 0 deletions driver/kafka/kafka2.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func NewKafkaManager(kcfg *common.KafkaConfig) (*KafkaManager, error) {
}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Net.SASL.User = kcfg.User
config.Net.SASL.Password = kcfg.Password

k.producer, err = sarama.NewSyncProducer(kcfg.Brokers, config)
if err != nil {
Expand Down

0 comments on commit 768c046

Please sign in to comment.