Skip to content

Commit

Permalink
Transformer Batch: make it possible to disable spark caching via conf…
Browse files Browse the repository at this point in the history
…ig (close #808)
  • Loading branch information
spenes authored and chuwy committed Apr 1, 2022
1 parent 0bd0467 commit af0a1ff
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object TransformerConfig {
final case class Monitoring(sentry: Option[Sentry])
final case class Sentry(dsn: URI)

final case class FeatureFlags(legacyMessageFormat: Boolean)
final case class FeatureFlags(legacyMessageFormat: Boolean, sparkCacheEnabled: Option[Boolean])

final case class RunInterval(sinceTimestamp: Option[RunInterval.IntervalInstant], sinceAge: Option[FiniteDuration], until: Option[RunInterval.IntervalInstant])
object RunInterval {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ object TransformerConfigSpec {
val exampleMonitoring = TransformerConfig.Monitoring(
Some(TransformerConfig.Sentry(URI.create("http://sentry.acme.com"))),
)
val exampleDefaultFeatureFlags = TransformerConfig.FeatureFlags(false)
val exampleDefaultMonitoring = TransformerConfig.Monitoring(None)
val exampleDeduplication = TransformerConfig.Deduplication(TransformerConfig.Deduplication.Synthetic.Broadcast(1))
val emptyRunInterval = TransformerConfig.RunInterval(None, None, None)
Expand All @@ -236,6 +235,7 @@ object TransformerConfigSpec {
Some(Duration.create("14 days").asInstanceOf[FiniteDuration]),
Some(TransformerConfig.RunInterval.IntervalInstant(Instant.parse("2021-12-10T18:34:52.00Z")))
)
val exampleDefaultFeatureFlags = TransformerConfig.FeatureFlags(false, None)
val exampleValidations = Validations(Some(Instant.parse("2021-11-18T11:00:00.00Z")))
val emptyValidations = Validations(None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ShredJob[T](@transient val spark: SparkSession,
// Handling of properly-formed rows; drop bad, turn proper events to `Event`
// Perform in-batch and cross-batch natural deduplications and writes found types to accumulator
// only one event from an event id and event fingerprint combination is kept
val good = common
val goodWithoutCache = common
.flatMap { shredded => shredded.toOption }
.groupBy { s => (s.event_id, s.event_fingerprint.getOrElse(UUID.randomUUID().toString)) }
.flatMap { case (_, s) =>
Expand All @@ -119,7 +119,12 @@ class ShredJob[T](@transient val spark: SparkSession,
}
}
.setName("good")
.cache()

// Check first if spark cache is enabled explicitly.
// If it is not enabled, check if CB deduplication is enabled.
val shouldCacheEnabled = config.featureFlags.sparkCacheEnabled.getOrElse(eventsManifest.isDefined)

val good = if (shouldCacheEnabled) goodWithoutCache.cache() else goodWithoutCache

// The events counter
// Using accumulators for counting is unreliable, but we don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ object ShredJobSpec {
TransformerConfig.Monitoring(None),
TransformerConfig.Deduplication(TransformerConfig.Deduplication.Synthetic.Broadcast(1)),
TransformerConfig.RunInterval(None, None, None),
TransformerConfig.FeatureFlags(false),
TransformerConfig.FeatureFlags(false, None),
TransformerConfig.Validations(None)
)
}
Expand Down

0 comments on commit af0a1ff

Please sign in to comment.