diff --git a/CHANGELOG b/CHANGELOG index e9bc2a2b..c79cf4a0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +Version 1.7.1 (2024-04-26) +-------------------------- +Bump sbt-snowplow-release to 0.3.2 +Repeater: PubSub ack extensions should match backoff delay (#373) + Version 1.7.0 (2023-11-20) -------------------------- Set GCP user agent header for BQ and Pubsub (#363) diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala index 9493b880..a7aba152 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Flow.scala @@ -100,8 +100,11 @@ object Flow { .as(Inserted.asInstanceOf[InsertStatus]) .value } else { - Logger[F].debug(s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Nack") >> - event.nack.as(Retry.asInstanceOf[InsertStatus].asRight) + Logger[F] + .debug( + s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Ignoring it so PubSub re-sends it later." + ) + .as(Retry.asInstanceOf[InsertStatus].asRight) } } } yield result diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala index 83a6a4b2..59297b35 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor import cats.effect._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import scala.concurrent.duration.DurationInt object Repeater extends IOApp { @@ -34,7 +35,8 @@ object Repeater extends IOApp { resources.env.projectId, resources.env.config.input.subscription, resources.uninsertable, - resources.env.gcpUserAgent + resources.env.gcpUserAgent, + command.backoffPeriod.seconds ) .interruptWhen(resources.stop) .through[IO, Unit](Flow.sink(resources)) diff --git a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala index f5a5f124..65f19d2a 100644 --- a/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala +++ b/modules/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala @@ -12,18 +12,26 @@ */ package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services -import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} -import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} import cats.effect._ import cats.effect.std.Queue import cats.syntax.all._ +import fs2.Stream +import org.typelevel.log4cats.Logger +import org.threeten.bp.{Duration => ThreetenDuration} import com.google.pubsub.v1.PubsubMessage +import com.google.api.gax.core.ExecutorProvider +import com.google.api.gax.batching.FlowControlSettings +import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors} + import com.permutive.pubsub.consumer.{ConsumerRecord, Model} import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} +import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader -import fs2.Stream -import org.typelevel.log4cats.Logger +import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} + +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} /** Module responsible for reading Pub/Sub */ object PubSub { @@ -33,18 +41,72 @@ object PubSub { projectId: String, subscription: String, uninsertable: Queue[F, BadRow], - gcpUserAgent: GcpUserAgent + gcpUserAgent: GcpUserAgent, + backoffPeriod: FiniteDuration ): Stream[F, ConsumerRecord[F, EventContainer]] = PubsubGoogleConsumer.subscribe[F, EventContainer]( Model.ProjectId(projectId), Model.Subscription(subscription), (msg, err, ack, _) => callback[F](msg, err, ack, uninsertable), PubsubGoogleConsumerConfig[F]( - onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), - customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))) + onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"), + customizeSubscriber = Some { + _.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)) + .setMaxAckExtensionPeriod(convertDuration(backoffPeriod.min(1.hour))) + .setMinDurationPerAckExtension(convertDuration(backoffPeriod.min(600.seconds).minus(1.second))) + .setExecutorProvider { + new ExecutorProvider { + def shouldAutoClose: Boolean = true + def getExecutor: ScheduledExecutorService = scheduledExecutorService + } + } + .setFlowControlSettings { + // Switch off any flow control, because we handle it ourselves via fs2's backpressure + FlowControlSettings.getDefaultInstance + } + } ) ) + private def convertDuration(d: FiniteDuration): ThreetenDuration = + ThreetenDuration.ofMillis(d.toMillis) + + def scheduledExecutorService: ScheduledExecutorService = + new ForwardingListeningExecutorService with ScheduledExecutorService { + val delegate = MoreExecutors.newDirectExecutorService + lazy val scheduler = new ScheduledThreadPoolExecutor(1) // I think this scheduler is never used, but I implement it here for safety + override def schedule[V]( + callable: Callable[V], + delay: Long, + unit: TimeUnit + ): ScheduledFuture[V] = + scheduler.schedule(callable, delay, unit) + override def schedule( + runnable: Runnable, + delay: Long, + unit: TimeUnit + ): ScheduledFuture[_] = + scheduler.schedule(runnable, delay, unit) + override def scheduleAtFixedRate( + runnable: Runnable, + initialDelay: Long, + period: Long, + unit: TimeUnit + ): ScheduledFuture[_] = + scheduler.scheduleAtFixedRate(runnable, initialDelay, period, unit) + override def scheduleWithFixedDelay( + runnable: Runnable, + initialDelay: Long, + delay: Long, + unit: TimeUnit + ): ScheduledFuture[_] = + scheduler.scheduleWithFixedDelay(runnable, initialDelay, delay, unit) + override def shutdown(): Unit = { + delegate.shutdown() + scheduler.shutdown() + } + } + private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = { val info = FailureDetails.LoaderRecoveryError.ParsingError(err.toString, Nil) val failure = Failure.LoaderRecoveryFailure(info) diff --git a/project/plugins.sbt b/project/plugins.sbt index e8115c34..7ac551be 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,4 +6,4 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.0") addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.3.3") addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") -addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1") +addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.2")