Skip to content

Commit

Permalink
Merge pull request #16 from tryfix/k-admin-fixes
Browse files Browse the repository at this point in the history
admin fetch info topic configs added
  • Loading branch information
gmbyapa authored Jun 14, 2020
2 parents 45de213 + 0779f6d commit 2d99e24
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
15 changes: 14 additions & 1 deletion admin/kafka_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (c *kafkaAdmin) FetchInfo(topics []string) (map[string]*Topic, error) {
}

for _, tp := range topicMeta {

var pts []Partition
for _, pt := range tp.Partitions {
pts = append(pts, Partition{
Expand All @@ -112,6 +111,20 @@ func (c *kafkaAdmin) FetchInfo(topics []string) (map[string]*Topic, error) {
if tp.Err != sarama.ErrNoError {
topicInfo[tp.Name].Error = tp.Err
}

// configs
confs, err := c.admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.TopicResource,
Name: tp.Name,
ConfigNames: []string{`cleanup.policy`, `min.insync.replicas`, `retention.ms`},
})
if err != nil {
return nil, err
}
topicInfo[tp.Name].ConfigEntries = map[string]string{}
for _, co := range confs {
topicInfo[tp.Name].ConfigEntries[co.Name] = co.Value
}
}

return topicInfo, nil
Expand Down
10 changes: 5 additions & 5 deletions admin/kafka_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
)


func TestKafkaAdmin_FetchInfo(t *testing.T) {
seedBroker := sarama.NewMockBroker(t, 1)
defer seedBroker.Close()
Expand All @@ -16,6 +15,7 @@ func TestKafkaAdmin_FetchInfo(t *testing.T) {
SetController(seedBroker.BrokerID()).
SetLeader("my_topic", 0, seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(t),
})

config := sarama.NewConfig()
Expand All @@ -27,7 +27,7 @@ func TestKafkaAdmin_FetchInfo(t *testing.T) {

topic := `my_topic`
admin := &kafkaAdmin{
admin: saramaAdmin,
admin: saramaAdmin,
logger: log.NewNoopLogger(),
}
tps, err := admin.FetchInfo([]string{topic})
Expand Down Expand Up @@ -65,13 +65,13 @@ func TestKafkaAdmin_CreateTopics(t *testing.T) {

topic := `my_topic`
admin := &kafkaAdmin{
admin: saramaAdmin,
admin: saramaAdmin,
logger: log.NewNoopLogger(),
}

err = admin.CreateTopics(map[string]*Topic{
topic: {
Name: topic,
Name: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestKafkaAdmin_DeleteTopics(t *testing.T) {

topic := `my_topic`
admin := &kafkaAdmin{
admin: saramaAdmin,
admin: saramaAdmin,
logger: log.NewNoopLogger(),
}

Expand Down
31 changes: 31 additions & 0 deletions examples/example_1/stream/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream

import (
"github.com/google/uuid"
"github.com/tryfix/kstream/admin"
"github.com/tryfix/kstream/consumer"
"github.com/tryfix/kstream/data"
"github.com/tryfix/kstream/examples/example_1/encoders"
Expand Down Expand Up @@ -43,6 +44,8 @@ func Init() {
builderConfig.MetricsReporter = metrics.PrometheusReporter(metrics.ReporterConf{`streams`, `k_stream_test`, nil})
builderConfig.Logger = log.StdLogger

kAdmin := admin.NewKafkaAdmin(builderConfig.BootstrapServers, admin.WithLogger(log.StdLogger))
CreateTopics(kAdmin)
//builderConfig.Producer.Pool.NumOfWorkers = 1

builder := kstream.NewStreamBuilder(builderConfig)
Expand Down Expand Up @@ -92,6 +95,34 @@ func Init() {

}

func CreateTopics(kAdmin admin.KafkaAdmin) {
var topics = map[string]*admin.Topic{
`transaction`: {
NumPartitions: 2,
ReplicationFactor: 1,
},
`account_detail`: {
NumPartitions: 2,
ReplicationFactor: 1,
ConfigEntries: map[string]string{
`cleanup.policy`: `compact`,
},
},
`customer_profile`: {
NumPartitions: 2,
ReplicationFactor: 1,
ConfigEntries: map[string]string{
`cleanup.policy`: `compact`,
},
},
}

defer kAdmin.Close()
if err := kAdmin.CreateTopics(topics); err != nil {
log.Fatal(err)
}
}

func InitStreams(builder *kstream.StreamBuilder) []kstream.Stream {

transactionStream := initTransactionStream(builder)
Expand Down

0 comments on commit 2d99e24

Please sign in to comment.