From b44b7286305cf6c61dfd3b500ed0dc4f1dac8b09 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 2 Mar 2022 15:05:01 +0100 Subject: [PATCH 01/16] Add proton internal consumer --- README.md | 28 +++++ cmd/consume.go | 129 +++++++++++++++++++ go.mod | 3 +- go.sum | 97 ++++++++++++-- internal/consumer/consumer.go | 189 ++++++++++++++++++++++++++++ internal/output/printer.go | 66 ++++++++++ internal/output/printer_test.go | 77 ++++++++++++ internal/protoparser/protoparser.go | 19 +++ 8 files changed, 597 insertions(+), 11 deletions(-) create mode 100644 cmd/consume.go create mode 100644 internal/consumer/consumer.go create mode 100644 internal/output/printer.go create mode 100644 internal/output/printer_test.go diff --git a/README.md b/README.md index 4de4127..38027dc 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,34 @@ Multiple proto files from a producer with input messages piped ### Usage with Kafka consumers +#### As a standalone consumer + +Proton can consume from Kafka directly. The syntax of all the parameters is kept as close as possible to the same from Kafkcat. + +The minimal configuration to run Proton as a standalone consumer is +```shell +proton consume -b my-broker -t my-topic -m ./my-schema.proto +``` +This would consume all the messages from the topic since its start and use default formatting. + +You can specify the start and/or the end offset timestamp in milliseconds. Both are optional. +```shell +proton consume -b my-broker -t my-topic -m ./my-schema.proto -s 1646218065015 -e 1646218099197 +``` +If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will + +You can specify the format of the output. +```shell +$ proton consume -b my-broker -t my-topic -m ./my-schema.proto -f "Time: %T \t %k\t%s" +# ... +Time: 1646218065015 key {"field1":"value1","field2":"value2"} +Time: 1646218099197 key {"field1":"value1","field2":"value2"} +# ... +``` +Run `proton consume -h` to see all the available formatting options. + +#### Piping from Kafkacat + Because Proto bytes can contain newlines (`\n`) and often do, we need to use a different marker to delimit the end of a message byte-stream and the beginning of the next. Proton expects an end of message marker, or will read to the end of the stream if not provided. diff --git a/cmd/consume.go b/cmd/consume.go new file mode 100644 index 0000000..6bd8aa2 --- /dev/null +++ b/cmd/consume.go @@ -0,0 +1,129 @@ +package cmd + +import ( + "bytes" + "context" + "log" + "os" + "os/signal" + + "github.com/Shopify/sarama" + "github.com/beatlabs/proton/v2/internal/consumer" + "github.com/beatlabs/proton/v2/internal/json" + "github.com/beatlabs/proton/v2/internal/output" + "github.com/beatlabs/proton/v2/internal/protoparser" + "github.com/spf13/cobra" +) + +// consumeCmd represents the consume command +var consumeCmd = &cobra.Command{ + Use: "consume", + Short: "consume from given topics", + Run: Run, +} + +var topic string +var broker string +var model string +var format string +var startTime, endTime int64 +var verbose bool + +func init() { + rootCmd.AddCommand(consumeCmd) + + consumeCmd.Flags().StringVarP(&broker, "broker", "b", "", "Broker URL to consume from") + if consumeCmd.MarkFlagRequired("broker") != nil { + log.Fatal("you must specify a a broker URL using the `-b ` option") + } + + consumeCmd.Flags().StringVarP(&topic, "topic", "t", "", "A topic to consume from") + if consumeCmd.MarkFlagRequired("topic") != nil { + log.Fatal("you must specify a topic to consume using the `-t ` option") + } + + consumeCmd.Flags().StringVarP(&model, "model", "m", "", "A path to a proto file an URL to it") + if consumeCmd.MarkFlagRequired("model") != nil { + log.Fatal("you must specify a proto file using the `-m ` option") + } + + consumeCmd.Flags().StringVarP(&format, "format", "f", "%T: %s", ` +A Kcat-like format string. Defaults to "%T: %s". +Format string tokens: + %s Message payload + %k Message key + %t Topic + %T Message timestamp (milliseconds since epoch UTC) + %Tf Message time formatted as RFC3339 # this is not supported by kcat + \n \r \t Newlines, tab + + // [not yet supported] \xXX \xNNN Any ASCII character + // [not yet supported] %S Message payload length (or -1 for NULL) + // [not yet supported] %R Message payload length (or -1 for NULL) serialized as a binary big endian 32-bit signed integer + // [not yet supported] %K Message key length (or -1 for NULL) + // [not yet supported] %h Message headers (n=v CSV) + // [not yet supported] %p Partition + // [not yet supported] %o Message offset +Example: + -f 'Key: %k, Time: %Tf \nValue: %s'`) + + consumeCmd.Flags().Int64VarP(&startTime, "start", "s", sarama.OffsetOldest, "Start offset timestamp milliseconds") + consumeCmd.Flags().Int64VarP(&endTime, "end", "e", sarama.OffsetNewest, "End offset timestamp milliseconds") + + consumeCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Whether to print out proton's debug messages") +} + +// Run runs this whole thing. +func Run(cmd *cobra.Command, _ []string) { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + protoParser, fileName, err := protoparser.New(ctx, model) + if err != nil { + log.Fatal(err) + } + + kafka, err := consumer.NewKafka(ctx, consumer.Cfg{ + URL: broker, + Topic: topic, + Start: startTime, + End: endTime, + Verbose: verbose, + }, &protoDecoder{json.Converter{ + Parser: protoParser, + Filename: fileName, + }}, output.NewFormatterPrinter(format, os.Stdout, os.Stderr)) + + if err != nil { + log.Fatal(err) + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + errCh := kafka.Run() + + select { + case err := <-errCh: + if err != nil { + log.Fatal(err) + } + case _ = <-signals: + break + } +} + +type protoDecoder struct { + json.Converter +} + +// Decode uses the existing json decoder and adapts it to this consumer. +func (p *protoDecoder) Decode(rawData []byte) (string, error) { + stream, errCh := p.ConvertStream(bytes.NewReader(rawData)) + select { + case msg := <-stream: + return string(msg), nil + case err := <-errCh: + return "", err + } +} diff --git a/go.mod b/go.mod index 8c1bbe5..9f5f9d9 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,13 @@ module github.com/beatlabs/proton/v2 go 1.15 require ( + github.com/Shopify/sarama v1.32.0 github.com/golang/protobuf v1.4.1 github.com/jhump/protoreflect v1.7.0 github.com/mitchellh/go-homedir v1.1.0 github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.1 - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.7.0 google.golang.org/protobuf v1.25.0 gopkg.in/h2non/gock.v1 v1.0.15 ) diff --git a/go.sum b/go.sum index 20ef8bd..e2373e7 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,10 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/Shopify/sarama v1.32.0 h1:P+RUjEaRU0GMMbYexGMDyrMkLhbbBVUVISDywi+IlFU= +github.com/Shopify/sarama v1.32.0/go.mod h1:+EmJJKZWVT/faR9RcOxJerP+LId4iWdQPBGLy1Y1Njs= +github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= +github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -35,15 +39,27 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns= +github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -72,6 +88,8 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -79,8 +97,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -90,6 +109,11 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -109,6 +133,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -120,6 +146,18 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jhump/protoreflect v1.7.0 h1:qJ7piXPrjP3mDrfHf5ATkxfLix8ANs226vpo0aACOn0= github.com/jhump/protoreflect v1.7.0/go.mod h1:RZkzh7Hi9J7qT/sPlWnJ/UwZqCJvciFxKDA0UCeltSM= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -130,13 +168,18 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -164,6 +207,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -180,13 +225,18 @@ github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= @@ -211,12 +261,18 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -232,6 +288,9 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292 h1:f+lwQ+GtmgoY+A2YaQxlSOnDjXcQ7ZRLWOHbC6HtRqE= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -266,8 +325,11 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -289,12 +351,22 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -320,8 +392,9 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -364,21 +437,25 @@ google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/h2non/gock.v1 v1.0.15 h1:SzLqcIlb/fDfg7UvukMpNcWsu7sI5tWwL+KCATZqks0= gopkg.in/h2non/gock.v1 v1.0.15/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go new file mode 100644 index 0000000..b78a4d0 --- /dev/null +++ b/internal/consumer/consumer.go @@ -0,0 +1,189 @@ +package consumer + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/Shopify/sarama" + "github.com/beatlabs/proton/v2/internal/output" + "github.com/beatlabs/proton/v2/internal/protoparser" +) + +// Cfg is the configuration of this consumer. +type Cfg struct { + URL string + Topic string + Start, End int64 + Verbose bool +} + +// Kafka is the consumer itself. +type Kafka struct { + ctx context.Context + + topic string + offsets []offsets + + verbose bool + + client sarama.Client + + decoder protoparser.Decoder + printer output.Printer +} + +type offsets struct { + partition int32 + start, end int64 +} + +// NewKafka returns a new instance of this consumer or an error if something isn't right. +func NewKafka(ctx context.Context, cfg Cfg, decoder protoparser.Decoder, printer output.Printer) (*Kafka, error) { + config := sarama.NewConfig() + config.ClientID = "proton-consumer" + config.Consumer.Return.Errors = true + config.Version = sarama.V0_11_0_0 + config.Consumer.IsolationLevel = sarama.ReadCommitted + + fmt.Println("Spinning the wheel... Connecting, gathering partitions data and stuff...") + + url := cfg.URL + if !strings.HasSuffix(url, ":9092") { + url = fmt.Sprintf("%s:9092", url) + } + client, err := sarama.NewClient([]string{url}, config) + if err != nil { + return nil, err + } + + var oo []offsets + topic := cfg.Topic + partitions, err := client.Partitions(topic) + if err != nil { + return nil, err + } + + for _, p := range partitions { + start := sarama.OffsetOldest + if cfg.Start != sarama.OffsetOldest { + start, err = client.GetOffset(topic, p, cfg.Start) + if err != nil { + fmt.Println(err) + return nil, err + } + } + + end := sarama.OffsetNewest + if cfg.End != sarama.OffsetNewest { + end, err = client.GetOffset(topic, p, cfg.End) + if err != nil { + fmt.Println(err) + return nil, err + } + } + + oo = append(oo, offsets{partition: p, start: start, end: end}) + } + + return &Kafka{ + ctx: ctx, + topic: topic, + offsets: oo, + verbose: cfg.Verbose, + client: client, + decoder: decoder, + printer: printer, + }, nil +} + +// Run runs the consumer and consumes everything according to its configuration. +// If any [infra] error happens before we even started, it gets written to the output error channel. +// If any [parsing] error happens during the consumption, it's given to a printer. +// When consumer reaches the configured end offset, it stops. Otherwise, it keeps waiting for new messages. +// All consumers will stop if the consumer context is cancelled. +func (k *Kafka) Run() <-chan error { + errCh := make(chan error) + + go func() { + wg := sync.WaitGroup{} + + consumer, err := sarama.NewConsumerFromClient(k.client) + if err != nil { + errCh <- err + return + } + + defer func() { + if err := consumer.Close(); err != nil { + errCh <- err + } + }() + + for _, o := range k.offsets { + wg.Add(1) + go func(topic string, o offsets) { + defer wg.Done() + + k.log(fmt.Sprintf("# Going to consume from %s until %s", offsetMsg(topic, o.partition, o.start), offsetMsg(topic, o.partition, o.end))) + + c, err := consumer.ConsumePartition(topic, o.partition, o.start) + if err != nil { + errCh <- err + return + } + + for { + select { + case <-k.ctx.Done(): + return + case message := <-c.Messages(): + msg, err := k.decoder.Decode(message.Value) + if err == nil { + k.printer.Print(output.Msg{ + Key: string(message.Key), + Value: msg, + Topic: topic, + Time: message.Timestamp, + }) + } else { + k.printer.PrintErr(err) + } + + if o.end == message.Offset { + k.log(fmt.Sprintf("# Reached stop timestamp for topic %s: exiting", offsetMsg(topic, o.partition, o.end))) + return + } + + if message.Offset+1 == c.HighWaterMarkOffset() { + k.log(fmt.Sprintf("# Reached stop timestamp for topic %s", offsetMsg(topic, o.partition, o.end))) + } + } + } + }(k.topic, o) + } + + wg.Wait() + close(errCh) + }() + + return errCh +} + +func offsetMsg(topic string, partition int32, offset int64) string { + om := fmt.Sprintf("%d", offset) + if offset == sarama.OffsetNewest { + om = "" + } + if offset == sarama.OffsetOldest { + om = "" + } + return fmt.Sprintf("%s [%d] at offset %s", topic, partition, om) +} + +func (k *Kafka) log(msg string) { + if k.verbose { + fmt.Println(msg) + } +} diff --git a/internal/output/printer.go b/internal/output/printer.go new file mode 100644 index 0000000..1deb7e4 --- /dev/null +++ b/internal/output/printer.go @@ -0,0 +1,66 @@ +package output + +import ( + "fmt" + "io" + "strconv" + "strings" + "time" +) + +// Printer is the interface that knows how to print different results of a kafka consumer. +type Printer interface { + Print(Msg) + PrintErr(error) +} + +// Msg is the successfully consumed Kafka message with some metadata for it. +type Msg struct { + Key, Value string + Topic string + Time time.Time +} + +// FormattedPrinter is a printer that knows how to parse the Kafkacat's format spec. +type FormattedPrinter struct { + format string + out, errOut io.Writer +} + +// NewFormatterPrinter returns a new instance of a printer that supports formatting similar to kafkacat's. +// Format tokens: +// %s Message payload +// %k Message key +// %t Topic +// %T Timestamp in milliseconds +// %Tf Timestamp formatted as RFC3339 +// \n \r Newlines +// \t Tab +func NewFormatterPrinter(format string, out, errOut io.Writer) *FormattedPrinter { + return &FormattedPrinter{ + format: format, + out: out, + errOut: errOut, + } +} + +// Print applies a specific format to a consumed Kafka message. +func (f *FormattedPrinter) Print(msg Msg) { + val := f.format + + val = strings.ReplaceAll(val, "\\t", "\t") + val = strings.ReplaceAll(val, "\\n", "\n") + val = strings.ReplaceAll(val, "\\r", "\r") + val = strings.ReplaceAll(val, "%s", msg.Value) + val = strings.ReplaceAll(val, "%k", msg.Key) + val = strings.ReplaceAll(val, "%t", msg.Topic) + val = strings.ReplaceAll(val, "%Tf", msg.Time.Format(time.RFC3339)) + val = strings.ReplaceAll(val, "%T", strconv.Itoa(int(msg.Time.UnixMilli()))) + + _, _ = fmt.Fprintln(f.out, val) +} + +// PrintErr knows how to print an error. +func (f *FormattedPrinter) PrintErr(err error) { + _, _ = fmt.Fprintln(f.errOut, err) +} diff --git a/internal/output/printer_test.go b/internal/output/printer_test.go new file mode 100644 index 0000000..906cece --- /dev/null +++ b/internal/output/printer_test.go @@ -0,0 +1,77 @@ +package output + +import ( + "bytes" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPrint(t *testing.T) { + currentTime := time.Unix(42, 123) + + tests := []struct { + name string + format string + msg Msg + expected string + }{ + { + name: "use all the possible formatting tokens", + format: "Topic: %t, Key: %k, \\n\\rMsg: %s, \\tTimestamp: %T, Time: %Tf", + msg: Msg{ + Key: "my-key", + Value: "my-val", + Topic: "my-topic", + Time: currentTime, + }, + expected: "Topic: my-topic, Key: my-key, \n\rMsg: my-val, \tTimestamp: 42000, Time: 1970-01-01T01:00:42+01:00\n", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given + buffer := bytes.NewBufferString("") + bufferErr := bytes.NewBufferString("") + printer := NewFormatterPrinter(test.format, buffer, bufferErr) + + // when + printer.Print(test.msg) + + // then + assert.Equal(t, test.expected, buffer.String()) + assert.Empty(t, bufferErr.String()) + }) + } +} + +func TestPrintErr(t *testing.T) { + tests := []struct { + name string + err error + expected string + }{ + { + name: "prints err", + err: errors.New("b00m"), + expected: "b00m\n", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given + buffer := bytes.NewBufferString("") + bufferErr := bytes.NewBufferString("") + printer := NewFormatterPrinter("irrelevant", buffer, bufferErr) + + // when + printer.PrintErr(test.err) + + // then + assert.Equal(t, test.expected, bufferErr.String()) + assert.Empty(t, buffer.String()) + }) + } +} diff --git a/internal/protoparser/protoparser.go b/internal/protoparser/protoparser.go index f40c7ae..866444c 100644 --- a/internal/protoparser/protoparser.go +++ b/internal/protoparser/protoparser.go @@ -11,6 +11,25 @@ import ( "github.com/jhump/protoreflect/desc/protoparse" ) +// Decoder is the interface that accepts bytes from Proto message and decodes it to a string. +type Decoder interface { + Decode([]byte) (string, error) +} + +// New initializes a proto parser. +func New(ctx context.Context, path string) (protoparse.Parser, string, error) { + u, err := url.Parse(path) + if err != nil { + return protoparse.Parser{}, path, err + } + + if u.Scheme == "" { + return NewFile(u.String()) + } + + return NewHTTP(ctx, u) +} + // NewFile initializes a proto parser from a local proto file. func NewFile(filePath string) (protoparse.Parser, string, error) { abs, err := fp.Abs(fp.Clean(filePath)) From 539f466c899446393ff514cb81aa5001a4e4efbb Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 2 Mar 2022 22:55:16 +0100 Subject: [PATCH 02/16] Make tool params more compatible with kafkacat --- README.md | 6 ++--- cmd/consume.go | 31 +++++++++++++++++++----- cmd/consume_test.go | 59 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 9 deletions(-) create mode 100644 cmd/consume_test.go diff --git a/README.md b/README.md index 38027dc..bb49ce0 100644 --- a/README.md +++ b/README.md @@ -70,19 +70,19 @@ Proton can consume from Kafka directly. The syntax of all the parameters is kept The minimal configuration to run Proton as a standalone consumer is ```shell -proton consume -b my-broker -t my-topic -m ./my-schema.proto +proton consume -b my-broker -t my-topic --proto ./my-schema.proto ``` This would consume all the messages from the topic since its start and use default formatting. You can specify the start and/or the end offset timestamp in milliseconds. Both are optional. ```shell -proton consume -b my-broker -t my-topic -m ./my-schema.proto -s 1646218065015 -e 1646218099197 +proton consume -b my-broker -t my-topic --proto ./my-schema.proto -o s@1646218065015 -o e@1646218099197 ``` If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will You can specify the format of the output. ```shell -$ proton consume -b my-broker -t my-topic -m ./my-schema.proto -f "Time: %T \t %k\t%s" +$ proton consume -b my-broker -t my-topic --proto ./my-schema.proto -f "Time: %T \t %k\t%s" # ... Time: 1646218065015 key {"field1":"value1","field2":"value2"} Time: 1646218099197 key {"field1":"value1","field2":"value2"} diff --git a/cmd/consume.go b/cmd/consume.go index 6bd8aa2..53de49b 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -6,6 +6,8 @@ import ( "log" "os" "os/signal" + "strconv" + "strings" "github.com/Shopify/sarama" "github.com/beatlabs/proton/v2/internal/consumer" @@ -24,8 +26,9 @@ var consumeCmd = &cobra.Command{ var topic string var broker string -var model string +var proto string var format string +var offsets []string var startTime, endTime int64 var verbose bool @@ -42,8 +45,8 @@ func init() { log.Fatal("you must specify a topic to consume using the `-t ` option") } - consumeCmd.Flags().StringVarP(&model, "model", "m", "", "A path to a proto file an URL to it") - if consumeCmd.MarkFlagRequired("model") != nil { + consumeCmd.Flags().StringVarP(&proto, "proto", "", "", "A path to a proto file an URL to it") + if consumeCmd.MarkFlagRequired("proto") != nil { log.Fatal("you must specify a proto file using the `-m ` option") } @@ -67,18 +70,34 @@ Format string tokens: Example: -f 'Key: %k, Time: %Tf \nValue: %s'`) - consumeCmd.Flags().Int64VarP(&startTime, "start", "s", sarama.OffsetOldest, "Start offset timestamp milliseconds") - consumeCmd.Flags().Int64VarP(&endTime, "end", "e", sarama.OffsetNewest, "End offset timestamp milliseconds") + consumeCmd.Flags().StringSliceVarP(&offsets, "offsets", "o", []string{}, "Start and end timestamp offsets") + startTime, endTime = parseOffsets(offsets) consumeCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Whether to print out proton's debug messages") } +func parseOffsets(offsets []string) (int64, int64) { + return parseOffset("s@", offsets, sarama.OffsetOldest), parseOffset("e@", offsets, sarama.OffsetNewest) +} + +func parseOffset(prefix string, offsets []string, defaultVal int64) int64 { + for _, offset := range offsets { + if strings.HasPrefix(offset, prefix) { + v, err := strconv.Atoi(offset[len(prefix):]) + if err == nil { + return int64(v) + } + } + } + return defaultVal +} + // Run runs this whole thing. func Run(cmd *cobra.Command, _ []string) { ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - protoParser, fileName, err := protoparser.New(ctx, model) + protoParser, fileName, err := protoparser.New(ctx, proto) if err != nil { log.Fatal(err) } diff --git a/cmd/consume_test.go b/cmd/consume_test.go new file mode 100644 index 0000000..b4e0f7c --- /dev/null +++ b/cmd/consume_test.go @@ -0,0 +1,59 @@ +package cmd + +import ( + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestParseOffsets(t *testing.T) { + tests := []struct { + name string + given []string + startTime, endTime int64 + }{ + { + name: "no offsets specified", + given: []string{}, + startTime: sarama.OffsetOldest, + endTime: sarama.OffsetNewest, + }, + { + name: "start offset specified", + given: []string{"s@24"}, + startTime: 24, + endTime: sarama.OffsetNewest, + }, + { + name: "end offset specified", + given: []string{"e@42"}, + startTime: sarama.OffsetOldest, + endTime: 42, + }, + { + name: "both offsets specified", + given: []string{"s@24", "e@42"}, + startTime: 24, + endTime: 42, + }, + { + name: "multiple offsets specified", + given: []string{"s@24", "e@42", "s@123", "e@321"}, + startTime: 24, + endTime: 42, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given + // TODO: fix the test + + // when + r1, r2 := parseOffsets(test.given) + + // then + assert.Equal(t, test.startTime, r1) + assert.Equal(t, test.endTime, r2) + }) + } +} From 266f22793c5ef9e743de40b355386c75f263a69b Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 2 Mar 2022 23:15:25 +0100 Subject: [PATCH 03/16] Support key grepping --- README.md | 6 ++++++ cmd/consume.go | 4 ++++ internal/consumer/consumer.go | 13 +++++++++++++ 3 files changed, 23 insertions(+) diff --git a/README.md b/README.md index bb49ce0..5e8cc85 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,12 @@ Time: 1646218099197 key {"field1":"value1","field2":"value2"} ``` Run `proton consume -h` to see all the available formatting options. +To filter out keys, you can use `--key ` option like in this example: +```shell +proton consume -b my-broker -t my-topic --proto ./my-schema.proto --key "my-key" +proton consume -b my-broker -t my-topic --proto ./my-schema.proto --key "my-k.*" +``` + #### Piping from Kafkacat Because Proto bytes can contain newlines (`\n`) and often do, diff --git a/cmd/consume.go b/cmd/consume.go index 53de49b..046b6d7 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -28,6 +28,7 @@ var topic string var broker string var proto string var format string +var keyGrep string var offsets []string var startTime, endTime int64 var verbose bool @@ -73,6 +74,8 @@ Example: consumeCmd.Flags().StringSliceVarP(&offsets, "offsets", "o", []string{}, "Start and end timestamp offsets") startTime, endTime = parseOffsets(offsets) + consumeCmd.Flags().StringVarP(&keyGrep, "key", "", ".*", "Grep RegExp for a key value") + consumeCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Whether to print out proton's debug messages") } @@ -108,6 +111,7 @@ func Run(cmd *cobra.Command, _ []string) { Start: startTime, End: endTime, Verbose: verbose, + KeyGrep: keyGrep, }, &protoDecoder{json.Converter{ Parser: protoParser, Filename: fileName, diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index b78a4d0..3331edb 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -3,6 +3,7 @@ package consumer import ( "context" "fmt" + "regexp" "strings" "sync" @@ -17,6 +18,7 @@ type Cfg struct { Topic string Start, End int64 Verbose bool + KeyGrep string } // Kafka is the consumer itself. @@ -26,6 +28,7 @@ type Kafka struct { topic string offsets []offsets + keyGrep *regexp.Regexp verbose bool client sarama.Client @@ -87,10 +90,16 @@ func NewKafka(ctx context.Context, cfg Cfg, decoder protoparser.Decoder, printer oo = append(oo, offsets{partition: p, start: start, end: end}) } + keyGrep, err := regexp.Compile(cfg.KeyGrep) + if err != nil { + return nil, err + } + return &Kafka{ ctx: ctx, topic: topic, offsets: oo, + keyGrep: keyGrep, verbose: cfg.Verbose, client: client, decoder: decoder, @@ -139,6 +148,10 @@ func (k *Kafka) Run() <-chan error { case <-k.ctx.Done(): return case message := <-c.Messages(): + if !k.keyGrep.Match(message.Key) { + continue + } + msg, err := k.decoder.Decode(message.Value) if err == nil { k.printer.Print(output.Msg{ From 265dd5a88148920d7a44a3ed0d2de75bb91cb32c Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Thu, 3 Mar 2022 11:05:22 +0100 Subject: [PATCH 04/16] Remove more stuff for a not-verbose mode --- internal/consumer/consumer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index 3331edb..940471f 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -50,7 +50,9 @@ func NewKafka(ctx context.Context, cfg Cfg, decoder protoparser.Decoder, printer config.Version = sarama.V0_11_0_0 config.Consumer.IsolationLevel = sarama.ReadCommitted - fmt.Println("Spinning the wheel... Connecting, gathering partitions data and stuff...") + if cfg.Verbose { + fmt.Println("Spinning the wheel... Connecting, gathering partitions data and stuff...") + } url := cfg.URL if !strings.HasSuffix(url, ":9092") { From 1cdf2a15f3299bcd178378fe5bc92983a8bc01dc Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 9 Mar 2022 10:19:04 +0100 Subject: [PATCH 05/16] Fix specifying timestamp offsets --- README.md | 2 +- cmd/consume.go | 48 ++++++++++++++++++++++++++++-------------------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 5e8cc85..8baf03e 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ This would consume all the messages from the topic since its start and use defau You can specify the start and/or the end offset timestamp in milliseconds. Both are optional. ```shell -proton consume -b my-broker -t my-topic --proto ./my-schema.proto -o s@1646218065015 -o e@1646218099197 +proton consume -b my-broker -t my-topic --proto ./my-schema.proto -s 1646218065015 -e 1646218099197 ``` If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will diff --git a/cmd/consume.go b/cmd/consume.go index 046b6d7..4853164 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -6,8 +6,6 @@ import ( "log" "os" "os/signal" - "strconv" - "strings" "github.com/Shopify/sarama" "github.com/beatlabs/proton/v2/internal/consumer" @@ -29,7 +27,8 @@ var broker string var proto string var format string var keyGrep string -var offsets []string + +//var offsets []string var startTime, endTime int64 var verbose bool @@ -71,29 +70,38 @@ Format string tokens: Example: -f 'Key: %k, Time: %Tf \nValue: %s'`) - consumeCmd.Flags().StringSliceVarP(&offsets, "offsets", "o", []string{}, "Start and end timestamp offsets") - startTime, endTime = parseOffsets(offsets) + /* + FIXME: kafkacat's syntax allows specifying offsets using an array of `-o` flags. + Specifying `-o 123 -o 234` doesn't work with Cobra for an unknown reason but it actually should. + So before it's fixed, using the non-conventional `-s 123456789` and `-e 234567890`. It should be `-o s@123456789 -o e@234567890` instead. + */ + //consumeCmd.Flags().StringSliceVarP(&offsets, "offsets", "o", []string{}, "Start and end timestamp offsets") + //startTime, endTime = parseOffsets(offsets) + consumeCmd.Flags().Int64VarP(&startTime, "start", "s", sarama.OffsetOldest, "Start timestamp offset") + consumeCmd.Flags().Int64VarP(&endTime, "end", "e", sarama.OffsetNewest, "End timestamp offset") consumeCmd.Flags().StringVarP(&keyGrep, "key", "", ".*", "Grep RegExp for a key value") consumeCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Whether to print out proton's debug messages") } -func parseOffsets(offsets []string) (int64, int64) { - return parseOffset("s@", offsets, sarama.OffsetOldest), parseOffset("e@", offsets, sarama.OffsetNewest) -} - -func parseOffset(prefix string, offsets []string, defaultVal int64) int64 { - for _, offset := range offsets { - if strings.HasPrefix(offset, prefix) { - v, err := strconv.Atoi(offset[len(prefix):]) - if err == nil { - return int64(v) - } - } - } - return defaultVal -} +//func parseOffsets(offsets []string) (int64, int64) { +// return parseOffset("s@", offsets, sarama.OffsetOldest), parseOffset("e@", offsets, sarama.OffsetNewest) +//} +// +//func parseOffset(prefix string, offsets []string, defaultVal int64) int64 { +// fmt.Println(offsets) +// +// for _, offset := range offsets { +// if strings.HasPrefix(offset, prefix) { +// v, err := strconv.Atoi(offset[len(prefix):]) +// if err == nil { +// return int64(v) +// } +// } +// } +// return defaultVal +//} // Run runs this whole thing. func Run(cmd *cobra.Command, _ []string) { From 69a5d7b4852693bf4073c518df0e30ca1e19635f Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 9 Mar 2022 10:47:13 +0100 Subject: [PATCH 06/16] Config in a struct, not global vars --- cmd/consume.go | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/cmd/consume.go b/cmd/consume.go index 4853164..8816698 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -22,35 +22,36 @@ var consumeCmd = &cobra.Command{ Run: Run, } -var topic string -var broker string -var proto string -var format string -var keyGrep string +// ConsumeCfg is the config for everything this tool needs. +type ConsumeCfg struct { + consumerCfg *consumer.Cfg + model string + format string +} -//var offsets []string -var startTime, endTime int64 -var verbose bool +var consumeCfg = &ConsumeCfg{ + consumerCfg: &consumer.Cfg{}, +} func init() { rootCmd.AddCommand(consumeCmd) - consumeCmd.Flags().StringVarP(&broker, "broker", "b", "", "Broker URL to consume from") + consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.URL, "broker", "b", "", "Broker URL to consume from") if consumeCmd.MarkFlagRequired("broker") != nil { log.Fatal("you must specify a a broker URL using the `-b ` option") } - consumeCmd.Flags().StringVarP(&topic, "topic", "t", "", "A topic to consume from") + consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.Topic, "topic", "t", "", "A topic to consume from") if consumeCmd.MarkFlagRequired("topic") != nil { log.Fatal("you must specify a topic to consume using the `-t ` option") } - consumeCmd.Flags().StringVarP(&proto, "proto", "", "", "A path to a proto file an URL to it") + consumeCmd.Flags().StringVarP(&consumeCfg.model, "proto", "", "", "A path to a proto file an URL to it") if consumeCmd.MarkFlagRequired("proto") != nil { log.Fatal("you must specify a proto file using the `-m ` option") } - consumeCmd.Flags().StringVarP(&format, "format", "f", "%T: %s", ` + consumeCmd.Flags().StringVarP(&consumeCfg.format, "format", "f", "%T: %s", ` A Kcat-like format string. Defaults to "%T: %s". Format string tokens: %s Message payload @@ -77,12 +78,12 @@ Example: */ //consumeCmd.Flags().StringSliceVarP(&offsets, "offsets", "o", []string{}, "Start and end timestamp offsets") //startTime, endTime = parseOffsets(offsets) - consumeCmd.Flags().Int64VarP(&startTime, "start", "s", sarama.OffsetOldest, "Start timestamp offset") - consumeCmd.Flags().Int64VarP(&endTime, "end", "e", sarama.OffsetNewest, "End timestamp offset") + consumeCmd.Flags().Int64VarP(&consumeCfg.consumerCfg.Start, "start", "s", sarama.OffsetOldest, "Start timestamp offset") + consumeCmd.Flags().Int64VarP(&consumeCfg.consumerCfg.End, "end", "e", sarama.OffsetNewest, "End timestamp offset") - consumeCmd.Flags().StringVarP(&keyGrep, "key", "", ".*", "Grep RegExp for a key value") + consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.KeyGrep, "key", "", ".*", "Grep RegExp for a key value") - consumeCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Whether to print out proton's debug messages") + consumeCmd.Flags().BoolVarP(&consumeCfg.consumerCfg.Verbose, "verbose", "v", false, "Whether to print out proton's debug messages") } //func parseOffsets(offsets []string) (int64, int64) { @@ -108,22 +109,22 @@ func Run(cmd *cobra.Command, _ []string) { ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - protoParser, fileName, err := protoparser.New(ctx, proto) + protoParser, fileName, err := protoparser.New(ctx, consumeCfg.model) if err != nil { log.Fatal(err) } kafka, err := consumer.NewKafka(ctx, consumer.Cfg{ - URL: broker, - Topic: topic, - Start: startTime, - End: endTime, - Verbose: verbose, - KeyGrep: keyGrep, + URL: consumeCfg.consumerCfg.URL, + Topic: consumeCfg.consumerCfg.Topic, + Start: consumeCfg.consumerCfg.Start, + End: consumeCfg.consumerCfg.End, + Verbose: consumeCfg.consumerCfg.Verbose, + KeyGrep: consumeCfg.consumerCfg.KeyGrep, }, &protoDecoder{json.Converter{ Parser: protoParser, Filename: fileName, - }}, output.NewFormatterPrinter(format, os.Stdout, os.Stderr)) + }}, output.NewFormatterPrinter(consumeCfg.format, os.Stdout, os.Stderr)) if err != nil { log.Fatal(err) From 2c69c9ef18dcb5f127a622b48aad685aeef76643 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 9 Mar 2022 10:54:00 +0100 Subject: [PATCH 07/16] Replace port only if no port is specified --- internal/consumer/consumer.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index 940471f..cbba147 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -3,8 +3,8 @@ package consumer import ( "context" "fmt" + "net/url" "regexp" - "strings" "sync" "github.com/Shopify/sarama" @@ -12,6 +12,8 @@ import ( "github.com/beatlabs/proton/v2/internal/protoparser" ) +const defaultPort = "9092" + // Cfg is the configuration of this consumer. type Cfg struct { URL string @@ -54,11 +56,17 @@ func NewKafka(ctx context.Context, cfg Cfg, decoder protoparser.Decoder, printer fmt.Println("Spinning the wheel... Connecting, gathering partitions data and stuff...") } - url := cfg.URL - if !strings.HasSuffix(url, ":9092") { - url = fmt.Sprintf("%s:9092", url) + parsed, err := url.Parse(cfg.URL) + if err != nil { + return nil, err + } + + broker := parsed.String() + if parsed.Port() == "" { + broker = fmt.Sprintf("%s:%s", broker, defaultPort) } - client, err := sarama.NewClient([]string{url}, config) + + client, err := sarama.NewClient([]string{broker}, config) if err != nil { return nil, err } From d0318bed48ff839a8a898fcf1c46c860a46f1134 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 9 Mar 2022 11:03:58 +0100 Subject: [PATCH 08/16] Remove commented code --- cmd/consume.go | 28 +++------------------ cmd/consume_test.go | 59 --------------------------------------------- 2 files changed, 3 insertions(+), 84 deletions(-) delete mode 100644 cmd/consume_test.go diff --git a/cmd/consume.go b/cmd/consume.go index 8816698..3230c68 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -71,13 +71,9 @@ Format string tokens: Example: -f 'Key: %k, Time: %Tf \nValue: %s'`) - /* - FIXME: kafkacat's syntax allows specifying offsets using an array of `-o` flags. - Specifying `-o 123 -o 234` doesn't work with Cobra for an unknown reason but it actually should. - So before it's fixed, using the non-conventional `-s 123456789` and `-e 234567890`. It should be `-o s@123456789 -o e@234567890` instead. - */ - //consumeCmd.Flags().StringSliceVarP(&offsets, "offsets", "o", []string{}, "Start and end timestamp offsets") - //startTime, endTime = parseOffsets(offsets) + // FIXME: kafkacat's syntax allows specifying offsets using an array of `-o` flags. + // Specifying `-o 123 -o 234` doesn't work with Cobra for an unknown reason but it actually should. + // So before it's fixed, using the non-conventional `-s 123456789` and `-e 234567890`. It should be `-o s@123456789 -o e@234567890` instead. consumeCmd.Flags().Int64VarP(&consumeCfg.consumerCfg.Start, "start", "s", sarama.OffsetOldest, "Start timestamp offset") consumeCmd.Flags().Int64VarP(&consumeCfg.consumerCfg.End, "end", "e", sarama.OffsetNewest, "End timestamp offset") @@ -86,24 +82,6 @@ Example: consumeCmd.Flags().BoolVarP(&consumeCfg.consumerCfg.Verbose, "verbose", "v", false, "Whether to print out proton's debug messages") } -//func parseOffsets(offsets []string) (int64, int64) { -// return parseOffset("s@", offsets, sarama.OffsetOldest), parseOffset("e@", offsets, sarama.OffsetNewest) -//} -// -//func parseOffset(prefix string, offsets []string, defaultVal int64) int64 { -// fmt.Println(offsets) -// -// for _, offset := range offsets { -// if strings.HasPrefix(offset, prefix) { -// v, err := strconv.Atoi(offset[len(prefix):]) -// if err == nil { -// return int64(v) -// } -// } -// } -// return defaultVal -//} - // Run runs this whole thing. func Run(cmd *cobra.Command, _ []string) { ctx, cancel := context.WithCancel(cmd.Context()) diff --git a/cmd/consume_test.go b/cmd/consume_test.go deleted file mode 100644 index b4e0f7c..0000000 --- a/cmd/consume_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package cmd - -import ( - "github.com/Shopify/sarama" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestParseOffsets(t *testing.T) { - tests := []struct { - name string - given []string - startTime, endTime int64 - }{ - { - name: "no offsets specified", - given: []string{}, - startTime: sarama.OffsetOldest, - endTime: sarama.OffsetNewest, - }, - { - name: "start offset specified", - given: []string{"s@24"}, - startTime: 24, - endTime: sarama.OffsetNewest, - }, - { - name: "end offset specified", - given: []string{"e@42"}, - startTime: sarama.OffsetOldest, - endTime: 42, - }, - { - name: "both offsets specified", - given: []string{"s@24", "e@42"}, - startTime: 24, - endTime: 42, - }, - { - name: "multiple offsets specified", - given: []string{"s@24", "e@42", "s@123", "e@321"}, - startTime: 24, - endTime: 42, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - // given - // TODO: fix the test - - // when - r1, r2 := parseOffsets(test.given) - - // then - assert.Equal(t, test.startTime, r1) - assert.Equal(t, test.endTime, r2) - }) - } -} From f78628accf2117898bc1b3a7c74b35b2e99702cb Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Thu, 10 Mar 2022 10:54:34 +0100 Subject: [PATCH 09/16] Fix consuming until offset when key grep is set --- cmd/consume.go | 4 +++- internal/consumer/consumer.go | 29 +++++++++++++++-------------- internal/output/printer.go | 11 ++++++++--- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/cmd/consume.go b/cmd/consume.go index 3230c68..7c2b116 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -51,12 +51,14 @@ func init() { log.Fatal("you must specify a proto file using the `-m ` option") } - consumeCmd.Flags().StringVarP(&consumeCfg.format, "format", "f", "%T: %s", ` + consumeCmd.Flags().StringVarP(&consumeCfg.format, "format", "f", "%Tf: %s", ` A Kcat-like format string. Defaults to "%T: %s". Format string tokens: %s Message payload %k Message key %t Topic + %p Partition + %o Offset %T Message timestamp (milliseconds since epoch UTC) %Tf Message time formatted as RFC3339 # this is not supported by kcat \n \r \t Newlines, tab diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index cbba147..352f6f9 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -54,6 +54,7 @@ func NewKafka(ctx context.Context, cfg Cfg, decoder protoparser.Decoder, printer if cfg.Verbose { fmt.Println("Spinning the wheel... Connecting, gathering partitions data and stuff...") + fmt.Println(fmt.Sprintf("Consuming from %s from timestamp %d until timestamp %d", cfg.Topic, cfg.Start, cfg.End)) } parsed, err := url.Parse(cfg.URL) @@ -158,20 +159,20 @@ func (k *Kafka) Run() <-chan error { case <-k.ctx.Done(): return case message := <-c.Messages(): - if !k.keyGrep.Match(message.Key) { - continue - } - - msg, err := k.decoder.Decode(message.Value) - if err == nil { - k.printer.Print(output.Msg{ - Key: string(message.Key), - Value: msg, - Topic: topic, - Time: message.Timestamp, - }) - } else { - k.printer.PrintErr(err) + if k.keyGrep.Match(message.Key) { + msg, err := k.decoder.Decode(message.Value) + if err == nil { + k.printer.Print(output.Msg{ + Key: string(message.Key), + Value: msg, + Topic: topic, + Partition: int(message.Partition), + Offset: int(message.Offset), + Time: message.Timestamp, + }) + } else { + k.printer.PrintErr(err) + } } if o.end == message.Offset { diff --git a/internal/output/printer.go b/internal/output/printer.go index 1deb7e4..d42ad96 100644 --- a/internal/output/printer.go +++ b/internal/output/printer.go @@ -16,9 +16,10 @@ type Printer interface { // Msg is the successfully consumed Kafka message with some metadata for it. type Msg struct { - Key, Value string - Topic string - Time time.Time + Key, Value string + Partition, Offset int + Topic string + Time time.Time } // FormattedPrinter is a printer that knows how to parse the Kafkacat's format spec. @@ -32,6 +33,8 @@ type FormattedPrinter struct { // %s Message payload // %k Message key // %t Topic +// %p Partition +// %o Offset // %T Timestamp in milliseconds // %Tf Timestamp formatted as RFC3339 // \n \r Newlines @@ -54,6 +57,8 @@ func (f *FormattedPrinter) Print(msg Msg) { val = strings.ReplaceAll(val, "%s", msg.Value) val = strings.ReplaceAll(val, "%k", msg.Key) val = strings.ReplaceAll(val, "%t", msg.Topic) + val = strings.ReplaceAll(val, "%p", fmt.Sprintf("%d", msg.Partition)) + val = strings.ReplaceAll(val, "%o", fmt.Sprintf("%d", msg.Offset)) val = strings.ReplaceAll(val, "%Tf", msg.Time.Format(time.RFC3339)) val = strings.ReplaceAll(val, "%T", strconv.Itoa(int(msg.Time.UnixMilli()))) From d3edacb73fafaa99d0b294061e432fb7dc525ba2 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy <4928787+azarakovskiy@users.noreply.github.com> Date: Wed, 23 Mar 2022 14:20:58 +0100 Subject: [PATCH 10/16] Finish the sentence Co-authored-by: Peter Klijn --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8baf03e..71d5a67 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ You can specify the start and/or the end offset timestamp in milliseconds. Both ```shell proton consume -b my-broker -t my-topic --proto ./my-schema.proto -s 1646218065015 -e 1646218099197 ``` -If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will +If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will keep consuming. You can specify the format of the output. ```shell From 83e258090b3d3c9c4fa126bbd3ab5cf170d9334d Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 23 Mar 2022 14:26:12 +0100 Subject: [PATCH 11/16] Add help text for proton consume --- README.md | 104 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 71d5a67..0132667 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ brew tap beatlabs/proton https://github.com/beatlabs/proton brew install proton ``` -## Usage +## Usage as a converter Protobuf to JSON ```shell script Usage: @@ -35,7 +35,7 @@ Flags: Defaults to the first message type in the Proton file if not specified ``` -## Examples +### Examples Proto file from URL with input message as argument ```shell script @@ -62,11 +62,77 @@ Multiple proto files from a producer with input messages piped ./testdata/producer.sh '--END--' | proton json -f ./testdata/addressbook.proto -m '--END--' ``` -### Usage with Kafka consumers +### Piping data from Kafkacat -#### As a standalone consumer +Because Proto bytes can contain newlines (`\n`) and often do, +we need to use a different marker to delimit the end of a message byte-stream and the beginning of the next. +Proton expects an end of message marker, or will read to the end of the stream if not provided. + +You can add markers at the end of each messae with tools like [kafkacat](https://github.com/edenhill/kcat), like so: + +```shell script +kcat -b my-broker:9092 -t my-topic -f '%s--END--' +``` + +You can consume messages and parse them with Proton by doing the following: + +```shell script +kcat -b my-broker:9092 -t my-topic -f '%s--END--' -o beginning | proton json -f ./my-schema.proto -m '--END--' +``` -Proton can consume from Kafka directly. The syntax of all the parameters is kept as close as possible to the same from Kafkcat. +**Don't see messages?** + +If you execute the above command, but you don't see messages until you stop the consumer, you might have to adjust your buffer settings: +You can do this with the `stdbuf` command. + +```shell script +stdbuf -o0 kcat -b my-broker:9092 -t my-topic -f '%s--END--' -o beginning | proton json -f ./my-schema.proto -m '--END--' +``` + +If you don't have `stdbuf`, you can install it via `brew install coreutils`. + +## Using proton as a standalone Kafka consumer + +Proton can consume from Kafka directly. The syntax of all the parameters is kept as close as possible to the same from Kafkacat. + +```shell +$ proton consume --help +consume from given topics + +Usage: + proton consume [flags] + +Flags: + -b, --broker string Broker URL to consume from + -e, --end int End timestamp offset (default -1) + -f, --format string + A Kcat-like format string. Defaults to "%T: %s". + Format string tokens: + %s Message payload + %k Message key + %t Topic + %p Partition + %o Offset + %T Message timestamp (milliseconds since epoch UTC) + %Tf Message time formatted as RFC3339 # this is not supported by kcat + \n \r \t Newlines, tab + + // [not yet supported] \xXX \xNNN Any ASCII character + // [not yet supported] %S Message payload length (or -1 for NULL) + // [not yet supported] %R Message payload length (or -1 for NULL) serialized as a binary big endian 32-bit signed integer + // [not yet supported] %K Message key length (or -1 for NULL) + // [not yet supported] %h Message headers (n=v CSV) + // [not yet supported] %p Partition + // [not yet supported] %o Message offset + Example: + -f 'Key: %k, Time: %Tf \nValue: %s' (default "%Tf: %s") + -h, --help help for consume + --key string Grep RegExp for a key value (default ".*") + --proto string A path to a proto file an URL to it + -s, --start int Start timestamp offset (default -2) + -t, --topic string A topic to consume from + -v, --verbose Whether to print out proton's debug messages +``` The minimal configuration to run Proton as a standalone consumer is ```shell @@ -96,32 +162,4 @@ proton consume -b my-broker -t my-topic --proto ./my-schema.proto --key "my-key" proton consume -b my-broker -t my-topic --proto ./my-schema.proto --key "my-k.*" ``` -#### Piping from Kafkacat - -Because Proto bytes can contain newlines (`\n`) and often do, -we need to use a different marker to delimit the end of a message byte-stream and the beginning of the next. -Proton expects an end of message marker, or will read to the end of the stream if not provided. - -You can add markers at the end of each messae with tools like [kafkacat](https://github.com/edenhill/kcat), like so: - -```shell script -kcat -b my-broker:9092 -t my-topic -f '%s--END--' -``` - -You can consume messages and parse them with Proton by doing the following: - -```shell script -kcat -b my-broker:9092 -t my-topic -f '%s--END--' -o beginning | proton json -f ./my-schema.proto -m '--END--' -``` - -**Don't see messages?** - -If you execute the above command, but you don't see messages until you stop the consumer, you might have to adjust your buffer settings: -You can do this with the `stdbuf` command. - -```shell script -stdbuf -o0 kcat -b my-broker:9092 -t my-topic -f '%s--END--' -o beginning | proton json -f ./my-schema.proto -m '--END--' -``` - -If you don't have `stdbuf`, you can install it via `brew install coreutils`. From 5cc074bf8c858988d587f4b5697f19d88956a1e0 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 23 Mar 2022 14:28:22 +0100 Subject: [PATCH 12/16] Remove help text like "not supported in kcat" --- README.md | 8 -------- cmd/consume.go | 12 ++---------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 0132667..ac55f97 100644 --- a/README.md +++ b/README.md @@ -116,14 +116,6 @@ Flags: %T Message timestamp (milliseconds since epoch UTC) %Tf Message time formatted as RFC3339 # this is not supported by kcat \n \r \t Newlines, tab - - // [not yet supported] \xXX \xNNN Any ASCII character - // [not yet supported] %S Message payload length (or -1 for NULL) - // [not yet supported] %R Message payload length (or -1 for NULL) serialized as a binary big endian 32-bit signed integer - // [not yet supported] %K Message key length (or -1 for NULL) - // [not yet supported] %h Message headers (n=v CSV) - // [not yet supported] %p Partition - // [not yet supported] %o Message offset Example: -f 'Key: %k, Time: %Tf \nValue: %s' (default "%Tf: %s") -h, --help help for consume diff --git a/cmd/consume.go b/cmd/consume.go index 7c2b116..008ad98 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -60,16 +60,8 @@ Format string tokens: %p Partition %o Offset %T Message timestamp (milliseconds since epoch UTC) - %Tf Message time formatted as RFC3339 # this is not supported by kcat - \n \r \t Newlines, tab - - // [not yet supported] \xXX \xNNN Any ASCII character - // [not yet supported] %S Message payload length (or -1 for NULL) - // [not yet supported] %R Message payload length (or -1 for NULL) serialized as a binary big endian 32-bit signed integer - // [not yet supported] %K Message key length (or -1 for NULL) - // [not yet supported] %h Message headers (n=v CSV) - // [not yet supported] %p Partition - // [not yet supported] %o Message offset + %Tf Message time formatted as RFC3339 + \n \r \t Newlines, tab Example: -f 'Key: %k, Time: %Tf \nValue: %s'`) From 74c07602cd04605a5d2c59420d585c3946c49883 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 23 Mar 2022 14:33:42 +0100 Subject: [PATCH 13/16] Extract p=message processing to a function --- internal/consumer/consumer.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index 352f6f9..9347be2 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -159,21 +159,7 @@ func (k *Kafka) Run() <-chan error { case <-k.ctx.Done(): return case message := <-c.Messages(): - if k.keyGrep.Match(message.Key) { - msg, err := k.decoder.Decode(message.Value) - if err == nil { - k.printer.Print(output.Msg{ - Key: string(message.Key), - Value: msg, - Topic: topic, - Partition: int(message.Partition), - Offset: int(message.Offset), - Time: message.Timestamp, - }) - } else { - k.printer.PrintErr(err) - } - } + k.processMessage(message) if o.end == message.Offset { k.log(fmt.Sprintf("# Reached stop timestamp for topic %s: exiting", offsetMsg(topic, o.partition, o.end))) @@ -195,6 +181,24 @@ func (k *Kafka) Run() <-chan error { return errCh } +func (k *Kafka) processMessage(message *sarama.ConsumerMessage) { + if k.keyGrep.Match(message.Key) { + msg, err := k.decoder.Decode(message.Value) + if err == nil { + k.printer.Print(output.Msg{ + Key: string(message.Key), + Value: msg, + Topic: message.Topic, + Partition: int(message.Partition), + Offset: int(message.Offset), + Time: message.Timestamp, + }) + } else { + k.printer.PrintErr(err) + } + } +} + func offsetMsg(topic string, partition int32, offset int64) string { om := fmt.Sprintf("%d", offset) if offset == sarama.OffsetNewest { From ba90f93e70fae9aeaf3160df96f92ff991eaf220 Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Wed, 23 Mar 2022 15:23:13 +0100 Subject: [PATCH 14/16] Fix offset parsing --- README.md | 45 +++++++++++++++++++---------------- cmd/consume.go | 53 ++++++++++++++++++++++++++--------------- cmd/consume_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 40 deletions(-) create mode 100644 cmd/consume_test.go diff --git a/README.md b/README.md index ac55f97..5febd6a 100644 --- a/README.md +++ b/README.md @@ -103,27 +103,30 @@ Usage: proton consume [flags] Flags: - -b, --broker string Broker URL to consume from - -e, --end int End timestamp offset (default -1) + -b, --broker string Broker URL to consume from -f, --format string - A Kcat-like format string. Defaults to "%T: %s". - Format string tokens: - %s Message payload - %k Message key - %t Topic - %p Partition - %o Offset - %T Message timestamp (milliseconds since epoch UTC) - %Tf Message time formatted as RFC3339 # this is not supported by kcat - \n \r \t Newlines, tab - Example: - -f 'Key: %k, Time: %Tf \nValue: %s' (default "%Tf: %s") - -h, --help help for consume - --key string Grep RegExp for a key value (default ".*") - --proto string A path to a proto file an URL to it - -s, --start int Start timestamp offset (default -2) - -t, --topic string A topic to consume from - -v, --verbose Whether to print out proton's debug messages + A Kcat-like format string. Defaults to "%T: %s". + Format string tokens: + %s Message payload + %k Message key + %t Topic + %p Partition + %o Offset + %T Message timestamp (milliseconds since epoch UTC) + %Tf Message time formatted as RFC3339 + \n \r \t Newlines, tab + Example: + -f 'Key: %k, Time: %Tf \nValue: %s' (default "%Tf: %s") + -h, --help help for consume + --key string Grep RegExp for a key value (default ".*") + -o, --offsets strings + Offset to start consuming from + s@ (timestamp in ms to start at) + e@ (timestamp in ms to stop at (not included)) + + --proto string A path to a proto file an URL to it + -t, --topic string A topic to consume from + -v, --verbose Whether to print out proton's debug messages ``` The minimal configuration to run Proton as a standalone consumer is @@ -134,7 +137,7 @@ This would consume all the messages from the topic since its start and use defau You can specify the start and/or the end offset timestamp in milliseconds. Both are optional. ```shell -proton consume -b my-broker -t my-topic --proto ./my-schema.proto -s 1646218065015 -e 1646218099197 +proton consume -b my-broker -t my-topic --proto ./my-schema.proto -o s@1646218065015 -o e@1646218099197 ``` If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will keep consuming. diff --git a/cmd/consume.go b/cmd/consume.go index 008ad98..5c2d976 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -6,6 +6,8 @@ import ( "log" "os" "os/signal" + "strconv" + "strings" "github.com/Shopify/sarama" "github.com/beatlabs/proton/v2/internal/consumer" @@ -24,13 +26,14 @@ var consumeCmd = &cobra.Command{ // ConsumeCfg is the config for everything this tool needs. type ConsumeCfg struct { - consumerCfg *consumer.Cfg + consumerCfg consumer.Cfg + offsets []string model string format string } var consumeCfg = &ConsumeCfg{ - consumerCfg: &consumer.Cfg{}, + consumerCfg: consumer.Cfg{}, } func init() { @@ -61,15 +64,15 @@ Format string tokens: %o Offset %T Message timestamp (milliseconds since epoch UTC) %Tf Message time formatted as RFC3339 - \n \r \t Newlines, tab + \n \r \t Newlines, tab Example: -f 'Key: %k, Time: %Tf \nValue: %s'`) - // FIXME: kafkacat's syntax allows specifying offsets using an array of `-o` flags. - // Specifying `-o 123 -o 234` doesn't work with Cobra for an unknown reason but it actually should. - // So before it's fixed, using the non-conventional `-s 123456789` and `-e 234567890`. It should be `-o s@123456789 -o e@234567890` instead. - consumeCmd.Flags().Int64VarP(&consumeCfg.consumerCfg.Start, "start", "s", sarama.OffsetOldest, "Start timestamp offset") - consumeCmd.Flags().Int64VarP(&consumeCfg.consumerCfg.End, "end", "e", sarama.OffsetNewest, "End timestamp offset") + consumeCmd.Flags().StringSliceVarP(&consumeCfg.offsets, "offsets", "o", []string{}, ` +Offset to start consuming from + s@ (timestamp in ms to start at) + e@ (timestamp in ms to stop at (not included)) +`) consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.KeyGrep, "key", "", ".*", "Grep RegExp for a key value") @@ -86,17 +89,13 @@ func Run(cmd *cobra.Command, _ []string) { log.Fatal(err) } - kafka, err := consumer.NewKafka(ctx, consumer.Cfg{ - URL: consumeCfg.consumerCfg.URL, - Topic: consumeCfg.consumerCfg.Topic, - Start: consumeCfg.consumerCfg.Start, - End: consumeCfg.consumerCfg.End, - Verbose: consumeCfg.consumerCfg.Verbose, - KeyGrep: consumeCfg.consumerCfg.KeyGrep, - }, &protoDecoder{json.Converter{ - Parser: protoParser, - Filename: fileName, - }}, output.NewFormatterPrinter(consumeCfg.format, os.Stdout, os.Stderr)) + consumeCfg.consumerCfg.Start, consumeCfg.consumerCfg.End = parseOffsets(consumeCfg.offsets) + + kafka, err := consumer.NewKafka(ctx, consumeCfg.consumerCfg, + &protoDecoder{json.Converter{ + Parser: protoParser, + Filename: fileName, + }}, output.NewFormatterPrinter(consumeCfg.format, os.Stdout, os.Stderr)) if err != nil { log.Fatal(err) @@ -131,3 +130,19 @@ func (p *protoDecoder) Decode(rawData []byte) (string, error) { return "", err } } + +func parseOffsets(offsets []string) (int64, int64) { + return parseOffset("s@", offsets, sarama.OffsetOldest), parseOffset("e@", offsets, sarama.OffsetNewest) +} + +func parseOffset(prefix string, offsets []string, defaultVal int64) int64 { + for _, offset := range offsets { + if strings.HasPrefix(offset, prefix) { + v, err := strconv.Atoi(offset[len(prefix):]) + if err == nil { + return int64(v) + } + } + } + return defaultVal +} diff --git a/cmd/consume_test.go b/cmd/consume_test.go new file mode 100644 index 0000000..fc593ad --- /dev/null +++ b/cmd/consume_test.go @@ -0,0 +1,58 @@ +package cmd + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" +) + +func TestParseOffsets(t *testing.T) { + tests := []struct { + name string + given []string + startTime, endTime int64 + }{ + { + name: "no offsets specified", + given: []string{}, + startTime: sarama.OffsetOldest, + endTime: sarama.OffsetNewest, + }, + { + name: "start offset specified", + given: []string{"s@24"}, + startTime: 24, + endTime: sarama.OffsetNewest, + }, + { + name: "end offset specified", + given: []string{"e@42"}, + startTime: sarama.OffsetOldest, + endTime: 42, + }, + { + name: "both offsets specified", + given: []string{"s@24", "e@42"}, + startTime: 24, + endTime: 42, + }, + { + name: "multiple offsets specified", + given: []string{"s@24", "e@42", "s@123", "e@321"}, + startTime: 24, + endTime: 42, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given + // when + r1, r2 := parseOffsets(test.given) + + // then + assert.Equal(t, test.startTime, r1) + assert.Equal(t, test.endTime, r2) + }) + } +} From 351f053f6cf7b8da4b653c454baab7a1c714487e Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Fri, 25 Mar 2022 11:03:09 +0100 Subject: [PATCH 15/16] bump go version --- .github/workflows/build.yml | 4 ++-- .github/workflows/release.yml | 4 ++-- go.mod | 40 ++++++++++++++++++++++++++++++++++- go.sum | 10 --------- 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4b965e8..3e38791 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,10 +16,10 @@ jobs: - name: Check out code into the Go module directory uses: actions/checkout@master - - name: Set up Go (1.15) + - name: Set up Go (1.17) uses: actions/setup-go@v1 with: - go-version: 1.15 + go-version: 1.17 - name: Linter run: | diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f11424e..00ab57f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,10 +17,10 @@ jobs: - name: Unshallow run: git fetch --prune --unshallow - - name: Set up Go (1.15) + - name: Set up Go (1.17) uses: actions/setup-go@v1 with: - go-version: 1.15 + go-version: 1.17 - name: Run GoReleaser uses: goreleaser/goreleaser-action@master diff --git a/go.mod b/go.mod index 9f5f9d9..288b976 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/beatlabs/proton/v2 -go 1.15 +go 1.17 require ( github.com/Shopify/sarama v1.32.0 @@ -13,3 +13,41 @@ require ( google.golang.org/protobuf v1.25.0 gopkg.in/h2non/gock.v1 v1.0.15 ) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.2.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/fsnotify/fsnotify v1.4.7 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect + github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.0.0 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.14.4 // indirect + github.com/magiconair/properties v1.8.1 // indirect + github.com/mitchellh/mapstructure v1.1.2 // indirect + github.com/pelletier/go-toml v1.2.0 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/spf13/afero v1.1.2 // indirect + github.com/spf13/cast v1.3.0 // indirect + github.com/spf13/jwalterweatherman v1.0.0 // indirect + github.com/spf13/pflag v1.0.3 // indirect + github.com/subosito/gotenv v1.2.0 // indirect + golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect + golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + golang.org/x/text v0.3.7 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect + gopkg.in/ini.v1 v1.51.0 // indirect + gopkg.in/yaml.v2 v2.2.4 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) diff --git a/go.sum b/go.sum index e2373e7..1208547 100644 --- a/go.sum +++ b/go.sum @@ -69,9 +69,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -79,7 +77,6 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -95,7 +92,6 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= @@ -110,9 +106,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= -github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -210,7 +204,6 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9 github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -259,7 +252,6 @@ github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -413,7 +405,6 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= -google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= @@ -421,7 +412,6 @@ google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEd google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= -google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= From 30163b3b256751b3ea4fcaa8e7f8c91dd9e4867f Mon Sep 17 00:00:00 2001 From: Alexey Zarakovskiy Date: Fri, 25 Mar 2022 12:06:43 +0100 Subject: [PATCH 16/16] fix timezone in a test --- internal/output/printer_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/output/printer_test.go b/internal/output/printer_test.go index 906cece..b0569bc 100644 --- a/internal/output/printer_test.go +++ b/internal/output/printer_test.go @@ -3,6 +3,7 @@ package output import ( "bytes" "errors" + "fmt" "testing" "time" @@ -11,6 +12,7 @@ import ( func TestPrint(t *testing.T) { currentTime := time.Unix(42, 123) + timeFormatted := currentTime.Format(time.RFC3339) tests := []struct { name string @@ -27,7 +29,7 @@ func TestPrint(t *testing.T) { Topic: "my-topic", Time: currentTime, }, - expected: "Topic: my-topic, Key: my-key, \n\rMsg: my-val, \tTimestamp: 42000, Time: 1970-01-01T01:00:42+01:00\n", + expected: fmt.Sprintf("Topic: my-topic, Key: my-key, \n\rMsg: my-val, \tTimestamp: 42000, Time: %s\n", timeFormatted), }, } for _, test := range tests {