diff --git a/src/StatsHttpd.inl b/src/StatsHttpd.inl index 02b5adc79..ea3207bbd 100644 --- a/src/StatsHttpd.inl +++ b/src/StatsHttpd.inl @@ -1090,6 +1090,9 @@ void StatsServerT::runThreadConsume() { rkmessage = kafkaConsumer_.consumer(kTimeoutMs); if (rkmessage != nullptr) { + // record the latest time that got a non-empty message + lastCleanTime = time(nullptr); + // consume share log (lastShareTime_ will be updated) consumeShareLog(rkmessage); rd_kafka_message_destroy(rkmessage); /* Return message to rdkafka */ @@ -1110,8 +1113,8 @@ void StatsServerT::runThreadConsume() { } // the initialization state ends after no shares in 5 minutes - // LastCleanTime is used here because it records the time when the consuming thread starting - if (lastShareTime_ == 0 && lastCleanTime + 300 < time(nullptr)) { + // LastCleanTime is used here because it records the latest time that got a non-empty message + if (rkmessage == nullptr && lastCleanTime + 300 < time(nullptr)) { isInitializing_ = false; break; }