Skip to content

Commit

Permalink
Merge pull request #24 from tryfix/global_consumer_sync_stuck
Browse files Browse the repository at this point in the history
Global table sync gets stuck fixed
  • Loading branch information
gmbyapa authored May 25, 2022
2 parents da45145 + 70a614d commit 09904d1
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions consumer/partition_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,11 @@ MainLoop:
Headers: msg.Headers,
}

//if highWatermark == 0 || highWatermark-1 == msg.Offset {
if msg.Offset == highWatermark-1 {
if msg.Offset == highWatermark-1 ||
// TODO this is a workaround to avoid the last message
// being a control record(https://kafka.apache.org/documentation/#controlbatch)
// when using transactional batches
msg.Offset == highWatermark-2 {
c.consumerEvents <- &PartitionEnd{
tps: []TopicPartition{{
Topic: msg.Topic,
Expand Down

0 comments on commit 09904d1

Please sign in to comment.