Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/1.7.1 #375

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 1.7.1 (2024-04-26)
--------------------------
Bump sbt-snowplow-release to 0.3.2
Repeater: PubSub ack extensions should match backoff delay (#373)

Version 1.7.0 (2023-11-20)
--------------------------
Set GCP user agent header for BQ and Pubsub (#363)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ object Flow {
.as(Inserted.asInstanceOf[InsertStatus])
.value
} else {
Logger[F].debug(s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Nack") >>
event.nack.as(Retry.asInstanceOf[InsertStatus].asRight)
Logger[F]
.debug(
s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Ignoring it so PubSub re-sends it later."
)
.as(Retry.asInstanceOf[InsertStatus].asRight)
}
}
} yield result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor
import cats.effect._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration.DurationInt

object Repeater extends IOApp {

Expand All @@ -34,7 +35,8 @@ object Repeater extends IOApp {
resources.env.projectId,
resources.env.config.input.subscription,
resources.uninsertable,
resources.env.gcpUserAgent
resources.env.gcpUserAgent,
command.backoffPeriod.seconds
)
.interruptWhen(resources.stop)
.through[IO, Unit](Flow.sink(resources))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
*/
package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}
import cats.effect._
import cats.effect.std.Queue
import cats.syntax.all._
import fs2.Stream
import org.typelevel.log4cats.Logger
import org.threeten.bp.{Duration => ThreetenDuration}
import com.google.pubsub.v1.PubsubMessage
import com.google.api.gax.core.ExecutorProvider
import com.google.api.gax.batching.FlowControlSettings
import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors}

import com.permutive.pubsub.consumer.{ConsumerRecord, Model}
import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader
import fs2.Stream
import org.typelevel.log4cats.Logger
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}

/** Module responsible for reading Pub/Sub */
object PubSub {
Expand All @@ -33,18 +41,72 @@ object PubSub {
projectId: String,
subscription: String,
uninsertable: Queue[F, BadRow],
gcpUserAgent: GcpUserAgent
gcpUserAgent: GcpUserAgent,
backoffPeriod: FiniteDuration
): Stream[F, ConsumerRecord[F, EventContainer]] =
PubsubGoogleConsumer.subscribe[F, EventContainer](
Model.ProjectId(projectId),
Model.Subscription(subscription),
(msg, err, ack, _) => callback[F](msg, err, ack, uninsertable),
PubsubGoogleConsumerConfig[F](
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)))
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some {
_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))
.setMaxAckExtensionPeriod(convertDuration(backoffPeriod.min(1.hour)))
.setMinDurationPerAckExtension(convertDuration(backoffPeriod.min(600.seconds).minus(1.second)))
.setExecutorProvider {
new ExecutorProvider {
def shouldAutoClose: Boolean = true
def getExecutor: ScheduledExecutorService = scheduledExecutorService
}
}
.setFlowControlSettings {
// Switch off any flow control, because we handle it ourselves via fs2's backpressure
FlowControlSettings.getDefaultInstance
}
}
)
)

private def convertDuration(d: FiniteDuration): ThreetenDuration =
ThreetenDuration.ofMillis(d.toMillis)

def scheduledExecutorService: ScheduledExecutorService =
new ForwardingListeningExecutorService with ScheduledExecutorService {
val delegate = MoreExecutors.newDirectExecutorService
lazy val scheduler = new ScheduledThreadPoolExecutor(1) // I think this scheduler is never used, but I implement it here for safety
override def schedule[V](
callable: Callable[V],
delay: Long,
unit: TimeUnit
): ScheduledFuture[V] =
scheduler.schedule(callable, delay, unit)
override def schedule(
runnable: Runnable,
delay: Long,
unit: TimeUnit
): ScheduledFuture[_] =
scheduler.schedule(runnable, delay, unit)
override def scheduleAtFixedRate(
runnable: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture[_] =
scheduler.scheduleAtFixedRate(runnable, initialDelay, period, unit)
override def scheduleWithFixedDelay(
runnable: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture[_] =
scheduler.scheduleWithFixedDelay(runnable, initialDelay, delay, unit)
override def shutdown(): Unit = {
delegate.shutdown()
scheduler.shutdown()
}
}

private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = {
val info = FailureDetails.LoaderRecoveryError.ParsingError(err.toString, Nil)
val failure = Failure.LoaderRecoveryFailure(info)
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.0")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.3.3")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")
addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1")
addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.2")
Loading