Skip to content

Commit

Permalink
Re-implement library based on Tapir's JSON Schema module and Tapir's …
Browse files Browse the repository at this point in the history
…JSON Pickler only for Scala 3
  • Loading branch information
calvinlfer committed Jul 15, 2024
1 parent fd1bfbb commit d7beba0
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 562 deletions.
4 changes: 2 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.7.4
version = 3.8.2
maxColumn = 120
align.preset = most
align.multiline = false
Expand All @@ -11,7 +11,7 @@ includeCurlyBraceInSelectChains = false
danglingParentheses.preset = true
optIn.annotationNewlines = true
newlines.alwaysBeforeMultilineDef = false
runner.dialect = scala213source3
runner.dialect = scala3
rewrite {
rules = [AvoidInfix, PreferCurlyFors, SortImports, RedundantBraces, RedundantParens]
redundantBraces.maxLines = 1
Expand Down
76 changes: 31 additions & 45 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
import org.typelevel.scalacoptions.{ScalaVersion, ScalacOptions}

inThisBuild {
val scala212 = "2.12.18"
val scala213 = "2.13.11"
val scala3 = "3.3.3"

Seq(
scalaVersion := scala213,
crossScalaVersions := Seq(scala212, scala213),
scalaVersion := scala3,
scalacOptions ++= ScalacOptions
.tokensForVersion(
ScalaVersion.fromString(scala3).right.get,
Set(
ScalacOptions.encoding("utf8"),
ScalacOptions.feature,
ScalacOptions.unchecked,
ScalacOptions.deprecation,
ScalacOptions.warnValueDiscard,
ScalacOptions.warnDeadCode,
ScalacOptions.release("17"),
ScalacOptions.privateKindProjector
)
),
versionScheme := Some("early-semver"),
githubWorkflowJavaVersions := List(JavaSpec.temurin("11")),
githubWorkflowJavaVersions := List(JavaSpec.temurin("17")),
githubWorkflowTargetTags ++= Seq("v*"),
githubWorkflowPublishTargetBranches := Seq(
RefPredicate.StartsWith(Ref.Tag("v")),
Expand Down Expand Up @@ -39,35 +53,6 @@ inThisBuild {
)
}

ThisBuild / scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, minor @ (12 | 13))) =>
val base = Seq(
"-deprecation",
"-encoding",
"UTF-8",
"-feature",
"-language:implicitConversions",
"-unchecked",
"-language:higherKinds",
"-Xlint",
"-Ywarn-dead-code",
"-Ywarn-numeric-widen",
"-Ywarn-value-discard",
"-Ywarn-unused",
"-Xsource:3"
)
if (minor == 12) "-Ypartial-unification" +: base
else base

case Some((3, _)) =>
Seq.empty

case Some(_) | None =>
Seq.empty
}
}

resolvers ++= Seq("confluent".at("https://packages.confluent.io/maven/"))

lazy val root =
Expand All @@ -78,19 +63,20 @@ lazy val root =
libraryDependencies ++= {
val circe = "io.circe"
val fd4s = "com.github.fd4s"
val fs2KafkaV = "3.0.1"
val tapir = "com.softwaremill.sttp.tapir"
val fs2KafkaV = "3.5.1"
val tapirV = "1.10.13"

Seq(
fd4s %% "fs2-kafka" % fs2KafkaV,
fd4s %% "fs2-kafka-vulcan" % fs2KafkaV,
"com.github.andyglow" %% "scala-jsonschema" % "0.7.9",
circe %% "circe-jackson212" % "0.14.0",
circe %% "circe-generic" % "0.14.5",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
"org.typelevel" %% "munit-cats-effect" % "2.0.0-M3" % Test,
"com.dimafeng" %% "testcontainers-scala-munit" % "0.40.17" % Test,
"ch.qos.logback" % "logback-classic" % "1.4.7" % Test,
"io.confluent" % "kafka-json-schema-serializer" % "7.4.0"
fd4s %% "fs2-kafka" % fs2KafkaV,
tapir %% "tapir-json-pickler" % tapirV,
tapir %% "tapir-apispec-docs" % tapirV,
"com.softwaremill.sttp.apispec" %% "jsonschema-circe" % "0.10.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0",
"org.typelevel" %% "munit-cats-effect" % "2.0.0-M3" % Test,
"com.dimafeng" %% "testcontainers-scala-munit" % "0.41.4" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.6" % Test,
"io.confluent" % "kafka-json-schema-serializer" % "7.6.1"
)
}
)
8 changes: 3 additions & 5 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
version: '2.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.4.0
image: confluentinc/cp-zookeeper:7.5.5
restart: unless-stopped
hostname: zoo1
ports:
Expand All @@ -12,7 +10,7 @@ services:
ZOOKEEPER_TICK_TIME: 2000

kafka1:
image: confluentinc/cp-kafka:7.4.0
image: confluentinc/cp-kafka:7.5.5
hostname: kafka1
ports:
- "9092:9092"
Expand Down Expand Up @@ -45,4 +43,4 @@ services:
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- zoo1
- kafka1
- kafka1
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.9.0
sbt.version = 1.10.1
6 changes: 4 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.3")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.12")
addSbtPlugin("com.github.sbt" % "sbt-github-actions" % "0.15.0")
addSbtPlugin("com.github.sbt" % "sbt-github-actions" % "0.24.0")

libraryDependencies += "org.typelevel" %% "scalac-options" % "0.1.5"
Original file line number Diff line number Diff line change
@@ -1,121 +1,36 @@
package io.kaizensolutions.jsonschema

import cats.effect.{Ref, Sync}
import cats.effect.{Resource, Sync}
import cats.syntax.all.*
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
import fs2.kafka.{Deserializer, KeyDeserializer, ValueDeserializer}
import io.circe.Decoder
import io.circe.jackson.jacksonToCirce
import com.fasterxml.jackson.databind.JsonNode
import fs2.kafka.*
import sttp.tapir.json.pickler.Pickler
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.json.JsonSchema
import io.confluent.kafka.schemaregistry.json.jackson.Jackson
import org.apache.kafka.common.errors.SerializationException
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
import scala.jdk.CollectionConverters.*

import java.io.{ByteArrayInputStream, IOException}
import java.nio.ByteBuffer
import scala.reflect.ClassTag

// See AbstractKafkaJsonSchemaDeserializer
object JsonSchemaDeserializer {
def forValue[F[_]: Sync, A: Decoder](
settings: JsonSchemaDeserializerSettings,
client: SchemaRegistryClient
)(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[ValueDeserializer[F, A]] =
toJsonSchema[F, A](jsonSchema, settings.jsonSchemaId)
.flatMap(create(settings, client, _))
.map(identity)

def forKey[F[_]: Sync, A: Decoder](
settings: JsonSchemaDeserializerSettings,
private[jsonschema] object JsonSchemaDeserializer:
def create[F[_], A](
isKey: Boolean,
confluentConfig: Map[String, Any],
client: SchemaRegistryClient
)(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[KeyDeserializer[F, A]] =
toJsonSchema[F, A](jsonSchema, settings.jsonSchemaId)
.flatMap(create(settings, client, _))
.map(identity)

def create[F[_]: Sync, A: Decoder](
settings: JsonSchemaDeserializerSettings,
client: SchemaRegistryClient,
schema: JsonSchema
): F[Deserializer[F, A]] = {
// NOTE: This is a workaround for Scala 2.12.x
val refMakeInstance = Ref.Make.syncInstance[F]
Ref.of[F, Set[Int]](Set.empty[Int])(refMakeInstance).map { cache =>
val objectMapper = Jackson
.newObjectMapper()
.configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
settings.failOnUnknownKeys
)

new JsonSchemaDeserializer[F, A](settings, schema, objectMapper, cache, client).jsonSchemaDeserializer
}
}

}
private class JsonSchemaDeserializer[F[_]: Sync, A](
settings: JsonSchemaDeserializerSettings,
clientSchema: JsonSchema,
objectMapper: ObjectMapper,
compatSubjectIdCache: Ref[F, Set[Int]],
client: SchemaRegistryClient
)(implicit decoder: Decoder[A]) {
private val MagicByte: Byte = 0x0
private val IdSize: Int = 4

def jsonSchemaDeserializer: Deserializer[F, A] =
Deserializer.instance { (_, _, bytes) =>
Sync[F].delay {
val buffer = getByteBuffer(bytes)
val id = buffer.getInt()
val serverSchema = client.getSchemaById(id).asInstanceOf[JsonSchema]
val bufferLength = buffer.limit() - 1 - IdSize
val start = buffer.position() + buffer.arrayOffset()
val jsonNode: JsonNode =
objectMapper.readTree(new ByteArrayInputStream(buffer.array, start, bufferLength))

if (settings.validatePayloadAgainstServerSchema) {
serverSchema.validate(jsonNode)
}

if (settings.validatePayloadAgainstClientSchema) {
clientSchema.validate(jsonNode)
}

(id, serverSchema, jsonNode)
}.flatMap { case (serverId, serverSchema, jsonNode) =>
val check =
if (settings.validateClientSchemaAgainstServer)
checkSchemaCompatibility(serverId, serverSchema)
else Sync[F].unit

check.as(jacksonToCirce(jsonNode))
}
.map(decoder.decodeJson)
.rethrow
}

private def getByteBuffer(payload: Array[Byte]): ByteBuffer = {
val buffer = ByteBuffer.wrap(payload)
if (buffer.get() != MagicByte)
throw new SerializationException("Unknown magic byte when deserializing from Kafka")
buffer
}

private def checkSchemaCompatibility(serverSubjectId: Int, serverSchema: JsonSchema): F[Unit] = {
val checkSchemaUpdateCache =
Sync[F].delay {
val incompatibilities = clientSchema.isBackwardCompatible(serverSchema)
if (!incompatibilities.isEmpty)
throw new IOException(
s"Incompatible consumer schema with server schema: ${incompatibilities.toArray.mkString(", ")}"
)
else ()
} *> compatSubjectIdCache.update(_ + serverSubjectId)

for {
existing <- compatSubjectIdCache.get
_ <- if (existing.contains(serverSubjectId)) Sync[F].unit else checkSchemaUpdateCache
} yield ()
}
}
)(using p: Pickler[A], sync: Sync[F]): Resource[F, Deserializer[F, A]] =
Resource
.make(acquire = sync.delay(KafkaJsonSchemaDeserializer[JsonNode](client)))(des => sync.delay(des.close()))
.evalTap: des =>
sync.delay(des.configure(confluentConfig.asJava, isKey))
.map(u => JsonSchemaDeserializer(u).deserializer)

private class JsonSchemaDeserializer[F[_], A](underlying: KafkaJsonSchemaDeserializer[JsonNode])(using
sync: Sync[F],
pickler: Pickler[A]
):
import pickler.innerUpickle.*
private given Reader[A] = pickler.innerUpickle.reader

val deserializer: Deserializer[F, A] =
Deserializer
.delegate(underlying)
.map: node =>
Either.catchNonFatal(read[A](node.toString()))
.rethrow
Loading

0 comments on commit d7beba0

Please sign in to comment.