From d7beba09a054f04aa84f22437468e9215b60b3b1 Mon Sep 17 00:00:00 2001 From: calvinlfer Date: Sun, 14 Jul 2024 20:49:44 -0400 Subject: [PATCH] Re-implement library based on Tapir's JSON Schema module and Tapir's JSON Pickler only for Scala 3 --- .scalafmt.conf | 4 +- build.sbt | 76 +++--- docker-compose.yaml | 8 +- project/build.properties | 2 +- project/plugins.sbt | 6 +- .../jsonschema/JsonSchemaDeserializer.scala | 145 +++------- .../JsonSchemaDeserializerSettings.scala | 97 ++++--- .../jsonschema/JsonSchemaSerializer.scala | 248 +++++------------- .../JsonSchemaSerializerSettings.scala | 144 +++++++--- .../kaizensolutions/jsonschema/package.scala | 44 ---- src/test/resources/logback-test.xml | 2 +- .../jsonschema/JsonSchemaSerDesSpec.scala | 176 ++++++------- 12 files changed, 390 insertions(+), 562 deletions(-) delete mode 100644 src/main/scala/io/kaizensolutions/jsonschema/package.scala diff --git a/.scalafmt.conf b/.scalafmt.conf index 9e6a4c7..96f7c09 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = 3.7.4 +version = 3.8.2 maxColumn = 120 align.preset = most align.multiline = false @@ -11,7 +11,7 @@ includeCurlyBraceInSelectChains = false danglingParentheses.preset = true optIn.annotationNewlines = true newlines.alwaysBeforeMultilineDef = false -runner.dialect = scala213source3 +runner.dialect = scala3 rewrite { rules = [AvoidInfix, PreferCurlyFors, SortImports, RedundantBraces, RedundantParens] redundantBraces.maxLines = 1 diff --git a/build.sbt b/build.sbt index 615b245..9f54f2a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,12 +1,26 @@ +import org.typelevel.scalacoptions.{ScalaVersion, ScalacOptions} + inThisBuild { - val scala212 = "2.12.18" - val scala213 = "2.13.11" + val scala3 = "3.3.3" Seq( - scalaVersion := scala213, - crossScalaVersions := Seq(scala212, scala213), + scalaVersion := scala3, + scalacOptions ++= ScalacOptions + .tokensForVersion( + ScalaVersion.fromString(scala3).right.get, + Set( + ScalacOptions.encoding("utf8"), + ScalacOptions.feature, + ScalacOptions.unchecked, + ScalacOptions.deprecation, + ScalacOptions.warnValueDiscard, + ScalacOptions.warnDeadCode, + ScalacOptions.release("17"), + ScalacOptions.privateKindProjector + ) + ), versionScheme := Some("early-semver"), - githubWorkflowJavaVersions := List(JavaSpec.temurin("11")), + githubWorkflowJavaVersions := List(JavaSpec.temurin("17")), githubWorkflowTargetTags ++= Seq("v*"), githubWorkflowPublishTargetBranches := Seq( RefPredicate.StartsWith(Ref.Tag("v")), @@ -39,35 +53,6 @@ inThisBuild { ) } -ThisBuild / scalacOptions ++= { - CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, minor @ (12 | 13))) => - val base = Seq( - "-deprecation", - "-encoding", - "UTF-8", - "-feature", - "-language:implicitConversions", - "-unchecked", - "-language:higherKinds", - "-Xlint", - "-Ywarn-dead-code", - "-Ywarn-numeric-widen", - "-Ywarn-value-discard", - "-Ywarn-unused", - "-Xsource:3" - ) - if (minor == 12) "-Ypartial-unification" +: base - else base - - case Some((3, _)) => - Seq.empty - - case Some(_) | None => - Seq.empty - } -} - resolvers ++= Seq("confluent".at("https://packages.confluent.io/maven/")) lazy val root = @@ -78,19 +63,20 @@ lazy val root = libraryDependencies ++= { val circe = "io.circe" val fd4s = "com.github.fd4s" - val fs2KafkaV = "3.0.1" + val tapir = "com.softwaremill.sttp.tapir" + val fs2KafkaV = "3.5.1" + val tapirV = "1.10.13" Seq( - fd4s %% "fs2-kafka" % fs2KafkaV, - fd4s %% "fs2-kafka-vulcan" % fs2KafkaV, - "com.github.andyglow" %% "scala-jsonschema" % "0.7.9", - circe %% "circe-jackson212" % "0.14.0", - circe %% "circe-generic" % "0.14.5", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", - "org.typelevel" %% "munit-cats-effect" % "2.0.0-M3" % Test, - "com.dimafeng" %% "testcontainers-scala-munit" % "0.40.17" % Test, - "ch.qos.logback" % "logback-classic" % "1.4.7" % Test, - "io.confluent" % "kafka-json-schema-serializer" % "7.4.0" + fd4s %% "fs2-kafka" % fs2KafkaV, + tapir %% "tapir-json-pickler" % tapirV, + tapir %% "tapir-apispec-docs" % tapirV, + "com.softwaremill.sttp.apispec" %% "jsonschema-circe" % "0.10.0", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0", + "org.typelevel" %% "munit-cats-effect" % "2.0.0-M3" % Test, + "com.dimafeng" %% "testcontainers-scala-munit" % "0.41.4" % Test, + "ch.qos.logback" % "logback-classic" % "1.5.6" % Test, + "io.confluent" % "kafka-json-schema-serializer" % "7.6.1" ) } ) diff --git a/docker-compose.yaml b/docker-compose.yaml index c40cfa4..e536ad2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,8 +1,6 @@ -version: '2.1' - services: zoo1: - image: confluentinc/cp-zookeeper:7.4.0 + image: confluentinc/cp-zookeeper:7.5.5 restart: unless-stopped hostname: zoo1 ports: @@ -12,7 +10,7 @@ services: ZOOKEEPER_TICK_TIME: 2000 kafka1: - image: confluentinc/cp-kafka:7.4.0 + image: confluentinc/cp-kafka:7.5.5 hostname: kafka1 ports: - "9092:9092" @@ -45,4 +43,4 @@ services: SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 depends_on: - zoo1 - - kafka1 \ No newline at end of file + - kafka1 diff --git a/project/build.properties b/project/build.properties index 8fd7d2e..136f452 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.9.0 +sbt.version = 1.10.1 diff --git a/project/plugins.sbt b/project/plugins.sbt index e312e77..3e79d57 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,6 @@ addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.3") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.12") -addSbtPlugin("com.github.sbt" % "sbt-github-actions" % "0.15.0") +addSbtPlugin("com.github.sbt" % "sbt-github-actions" % "0.24.0") + +libraryDependencies += "org.typelevel" %% "scalac-options" % "0.1.5" diff --git a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializer.scala b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializer.scala index a5e193f..379487c 100644 --- a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializer.scala +++ b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializer.scala @@ -1,121 +1,36 @@ package io.kaizensolutions.jsonschema -import cats.effect.{Ref, Sync} +import cats.effect.{Resource, Sync} import cats.syntax.all.* -import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper} -import fs2.kafka.{Deserializer, KeyDeserializer, ValueDeserializer} -import io.circe.Decoder -import io.circe.jackson.jacksonToCirce +import com.fasterxml.jackson.databind.JsonNode +import fs2.kafka.* +import sttp.tapir.json.pickler.Pickler import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient -import io.confluent.kafka.schemaregistry.json.JsonSchema -import io.confluent.kafka.schemaregistry.json.jackson.Jackson -import org.apache.kafka.common.errors.SerializationException +import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer +import scala.jdk.CollectionConverters.* -import java.io.{ByteArrayInputStream, IOException} -import java.nio.ByteBuffer -import scala.reflect.ClassTag - -// See AbstractKafkaJsonSchemaDeserializer -object JsonSchemaDeserializer { - def forValue[F[_]: Sync, A: Decoder]( - settings: JsonSchemaDeserializerSettings, - client: SchemaRegistryClient - )(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[ValueDeserializer[F, A]] = - toJsonSchema[F, A](jsonSchema, settings.jsonSchemaId) - .flatMap(create(settings, client, _)) - .map(identity) - - def forKey[F[_]: Sync, A: Decoder]( - settings: JsonSchemaDeserializerSettings, +private[jsonschema] object JsonSchemaDeserializer: + def create[F[_], A]( + isKey: Boolean, + confluentConfig: Map[String, Any], client: SchemaRegistryClient - )(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[KeyDeserializer[F, A]] = - toJsonSchema[F, A](jsonSchema, settings.jsonSchemaId) - .flatMap(create(settings, client, _)) - .map(identity) - - def create[F[_]: Sync, A: Decoder]( - settings: JsonSchemaDeserializerSettings, - client: SchemaRegistryClient, - schema: JsonSchema - ): F[Deserializer[F, A]] = { - // NOTE: This is a workaround for Scala 2.12.x - val refMakeInstance = Ref.Make.syncInstance[F] - Ref.of[F, Set[Int]](Set.empty[Int])(refMakeInstance).map { cache => - val objectMapper = Jackson - .newObjectMapper() - .configure( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, - settings.failOnUnknownKeys - ) - - new JsonSchemaDeserializer[F, A](settings, schema, objectMapper, cache, client).jsonSchemaDeserializer - } - } - -} -private class JsonSchemaDeserializer[F[_]: Sync, A]( - settings: JsonSchemaDeserializerSettings, - clientSchema: JsonSchema, - objectMapper: ObjectMapper, - compatSubjectIdCache: Ref[F, Set[Int]], - client: SchemaRegistryClient -)(implicit decoder: Decoder[A]) { - private val MagicByte: Byte = 0x0 - private val IdSize: Int = 4 - - def jsonSchemaDeserializer: Deserializer[F, A] = - Deserializer.instance { (_, _, bytes) => - Sync[F].delay { - val buffer = getByteBuffer(bytes) - val id = buffer.getInt() - val serverSchema = client.getSchemaById(id).asInstanceOf[JsonSchema] - val bufferLength = buffer.limit() - 1 - IdSize - val start = buffer.position() + buffer.arrayOffset() - val jsonNode: JsonNode = - objectMapper.readTree(new ByteArrayInputStream(buffer.array, start, bufferLength)) - - if (settings.validatePayloadAgainstServerSchema) { - serverSchema.validate(jsonNode) - } - - if (settings.validatePayloadAgainstClientSchema) { - clientSchema.validate(jsonNode) - } - - (id, serverSchema, jsonNode) - }.flatMap { case (serverId, serverSchema, jsonNode) => - val check = - if (settings.validateClientSchemaAgainstServer) - checkSchemaCompatibility(serverId, serverSchema) - else Sync[F].unit - - check.as(jacksonToCirce(jsonNode)) - } - .map(decoder.decodeJson) - .rethrow - } - - private def getByteBuffer(payload: Array[Byte]): ByteBuffer = { - val buffer = ByteBuffer.wrap(payload) - if (buffer.get() != MagicByte) - throw new SerializationException("Unknown magic byte when deserializing from Kafka") - buffer - } - - private def checkSchemaCompatibility(serverSubjectId: Int, serverSchema: JsonSchema): F[Unit] = { - val checkSchemaUpdateCache = - Sync[F].delay { - val incompatibilities = clientSchema.isBackwardCompatible(serverSchema) - if (!incompatibilities.isEmpty) - throw new IOException( - s"Incompatible consumer schema with server schema: ${incompatibilities.toArray.mkString(", ")}" - ) - else () - } *> compatSubjectIdCache.update(_ + serverSubjectId) - - for { - existing <- compatSubjectIdCache.get - _ <- if (existing.contains(serverSubjectId)) Sync[F].unit else checkSchemaUpdateCache - } yield () - } -} + )(using p: Pickler[A], sync: Sync[F]): Resource[F, Deserializer[F, A]] = + Resource + .make(acquire = sync.delay(KafkaJsonSchemaDeserializer[JsonNode](client)))(des => sync.delay(des.close())) + .evalTap: des => + sync.delay(des.configure(confluentConfig.asJava, isKey)) + .map(u => JsonSchemaDeserializer(u).deserializer) + +private class JsonSchemaDeserializer[F[_], A](underlying: KafkaJsonSchemaDeserializer[JsonNode])(using + sync: Sync[F], + pickler: Pickler[A] +): + import pickler.innerUpickle.* + private given Reader[A] = pickler.innerUpickle.reader + + val deserializer: Deserializer[F, A] = + Deserializer + .delegate(underlying) + .map: node => + Either.catchNonFatal(read[A](node.toString())) + .rethrow diff --git a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala index 21eac07..7e0d6c4 100644 --- a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala +++ b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala @@ -1,52 +1,75 @@ package io.kaizensolutions.jsonschema +import cats.effect.Resource +import cats.effect.Sync +import com.fasterxml.jackson.databind.JsonNode +import fs2.kafka.* +import io.confluent.kafka.schemaregistry.SchemaProvider +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.* +import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig.* +import sttp.tapir.json.pickler.Pickler + +import scala.jdk.CollectionConverters.* + object JsonSchemaDeserializerSettings { val default: JsonSchemaDeserializerSettings = JsonSchemaDeserializerSettings() } -/** - * Settings that describe how to interact with Confluent's Schema Registry when - * deserializing data - * - * @param validatePayloadAgainstServerSchema - * will validate the payload against the schema on the server - * @param validatePayloadAgainstClientSchema - * will validate the payload against the schema derived from the datatype you - * specify - * @param validateClientSchemaAgainstServer - * will validate the schema you specify against the server's schema - * @param failOnUnknownKeys - * will specify failure when unknown JSON keys are encountered - * @param jsonSchemaId - * is used to override the schema ID of the data that is being consumed - */ final case class JsonSchemaDeserializerSettings( - validatePayloadAgainstServerSchema: Boolean = false, - validatePayloadAgainstClientSchema: Boolean = false, - validateClientSchemaAgainstServer: Boolean = false, - failOnUnknownKeys: Boolean = false, - jsonSchemaId: Option[String] = None + schemaRegistryUrl: String = "http://localhost:8081", + failOnUnknownProperties: Boolean = true, + failOnInvalidSchema: Boolean = false, + cacheCapacity: Int = 1024, + client: Option[SchemaRegistryClient] = None ) { self => - def withPayloadValidationAgainstServerSchema(b: Boolean): JsonSchemaDeserializerSettings = - self.copy(validatePayloadAgainstServerSchema = b) + def withSchemaRegistryUrl(url: String): JsonSchemaDeserializerSettings = + self.copy(schemaRegistryUrl = url) + + def withFailOnUnknownProperties(b: Boolean): JsonSchemaDeserializerSettings = + self.copy(failOnUnknownProperties = b) - def withPayloadValidationAgainstClientSchema(b: Boolean): JsonSchemaDeserializerSettings = - self.copy(validatePayloadAgainstClientSchema = b) + def withFailOnInvalidSchema(b: Boolean): JsonSchemaDeserializerSettings = + self.copy(failOnInvalidSchema = b) - def withAggressiveSchemaValidation(b: Boolean): JsonSchemaDeserializerSettings = - self.copy(validateClientSchemaAgainstServer = b) + def withClient(client: SchemaRegistryClient): JsonSchemaDeserializerSettings = + self.copy(client = Option(client)) - def withFailOnUnknownKeys(b: Boolean): JsonSchemaDeserializerSettings = - self.copy(failOnUnknownKeys = b) + def withCacheCapacity(value: Int): JsonSchemaDeserializerSettings = + self.copy(cacheCapacity = value) - def withAggressiveValidation(b: Boolean): JsonSchemaDeserializerSettings = - self.copy( - validatePayloadAgainstServerSchema = b, - validatePayloadAgainstClientSchema = b, - validateClientSchemaAgainstServer = b, - failOnUnknownKeys = b + def forKey[F[_], A](using Pickler[A], Sync[F]): Resource[F, KeyDeserializer[F, A]] = + create( + config + (JSON_KEY_TYPE -> classOf[JsonNode].getName()), + isKey = true ) - def withJsonSchemaId(id: String): JsonSchemaDeserializerSettings = - self.copy(jsonSchemaId = Some(id)) + def forValue[F[_], A](using Pickler[A], Sync[F]): Resource[F, ValueDeserializer[F, A]] = + create( + config + (JSON_VALUE_TYPE -> classOf[JsonNode].getName()), + isKey = false + ) + + private def config = Map( + SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl, + FAIL_INVALID_SCHEMA -> failOnInvalidSchema, + FAIL_UNKNOWN_PROPERTIES -> failOnUnknownProperties + ) + + private def create[F[_], A]( + confluentConfig: Map[String, Any], + isKey: Boolean + )(using p: Pickler[A], sync: Sync[F]): Resource[F, Deserializer[F, A]] = + client + .map(Resource.pure[F, SchemaRegistryClient]) + .getOrElse: + val providers: java.util.List[SchemaProvider] = List(new JsonSchemaProvider()).asJava + val acquire = sync.delay( + new CachedSchemaRegistryClient(schemaRegistryUrl, cacheCapacity, providers, confluentConfig.asJava) + ) + val release = (client: SchemaRegistryClient) => sync.delay(client.close()) + Resource.make(acquire)(release) + .flatMap(client => JsonSchemaDeserializer.create(isKey, confluentConfig, client)) } diff --git a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializer.scala b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializer.scala index 875a5ad..d8fc9ca 100644 --- a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializer.scala +++ b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializer.scala @@ -1,195 +1,67 @@ package io.kaizensolutions.jsonschema -import cats.effect.{Ref, Sync} -import cats.syntax.all.* +import cats.effect.Resource +import cats.effect.Sync +import cats.syntax.functor.* import com.fasterxml.jackson.databind.JsonNode -import fs2.kafka.{KeySerializer, Serializer, ValueSerializer} -import io.circe.Encoder -import io.circe.jackson.circeToJackson +import com.fasterxml.jackson.databind.ObjectMapper +import fs2.kafka.* import io.circe.syntax.* -import io.confluent.kafka.schemaregistry.ParsedSchema import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient import io.confluent.kafka.schemaregistry.json.JsonSchema -import io.confluent.kafka.schemaregistry.json.jackson.Jackson -import io.kaizensolutions.jsonschema.JsonSchemaSerializer.SubjectSchema - -import java.io.{ByteArrayOutputStream, IOException} -import java.nio.ByteBuffer -import scala.reflect.ClassTag -import scala.jdk.OptionConverters.* - -/** - * Look at Confluent's KafkaJsonSchemaSerializer -> - * AbstractKafkaJsonSchemaSerializer -> AbstractKafkaSchemaSerDe - * - * The real implementation does the following (minus a few details): - * - * 1. call configure (provide isKey to figure out the subject name) - * - determine whether we automatically register the schema (involves using - * the subject name and the topic) - * - or determine whether we use the latest schema - * - determine whether to use strict compatibility - * - determine whether to validate the body against the payload and fail if - * not correct 2. serialization of a message - * - extract the JSON Schema of the message (we don't need to do this every - * time because our Serializers are very specific) - * - register the schema (provided configuration) or use the latest schema - * - validate message against JSON schema (provided configuration) - * - write out message: magic byte ++ subject id ++ payload - * - * see AbstractKafkaJsonSchemaDeserializer for deserialization details - */ -object JsonSchemaSerializer { - final case class SubjectSchema(subject: String, schema: ParsedSchema) - - def forKey[F[_]: Sync, A: Encoder]( - settings: JsonSchemaSerializerSettings, - client: SchemaRegistryClient - )(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[KeySerializer[F, A]] = - toJsonSchema(jsonSchema, settings.jsonSchemaId) - .flatMap(forKey[F, A](settings, client, _)) - - def forKey[F[_]: Sync, A: Encoder]( - settings: JsonSchemaSerializerSettings, - client: SchemaRegistryClient, - schema: JsonSchema - ): F[KeySerializer[F, A]] = - Ref.of[F, Map[SubjectSchema, ParsedSchema]](Map.empty).map { cache => - new JsonSchemaSerializer[F, A](client, settings, cache, schema) - .jsonSchemaSerializer(true) - } - - def forValue[F[_]: Sync, A: Encoder]( - settings: JsonSchemaSerializerSettings, - client: SchemaRegistryClient - )(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[ValueSerializer[F, A]] = - toJsonSchema(jsonSchema, settings.jsonSchemaId) - .flatMap(forValue(settings, client, _)) - - def forValue[F[_]: Sync, A: Encoder]( - settings: JsonSchemaSerializerSettings, +import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer +import sttp.apispec.{ExampleSingleValue, SchemaType} +import sttp.apispec.circe.* +import sttp.tapir.docs.apispec.schema.* +import sttp.tapir.json.pickler.Pickler + +import scala.jdk.CollectionConverters.* + +private[jsonschema] object JsonSchemaSerializer: + def create[F[_], A]( + isKey: Boolean, + confluentConfig: java.util.Map[String, Any], + mapper: ObjectMapper, client: SchemaRegistryClient, - schema: JsonSchema - ): F[ValueSerializer[F, A]] = - Ref.of[F, Map[SubjectSchema, ParsedSchema]](Map.empty).map { cache => - new JsonSchemaSerializer[F, A](client, settings, cache, schema) - .jsonSchemaSerializer(false) - } -} - -private final class JsonSchemaSerializer[F[_]: Sync, A: Encoder]( - client: SchemaRegistryClient, - settings: JsonSchemaSerializerSettings, - cache: Ref[F, Map[SubjectSchema, ParsedSchema]], - clientSchema: JsonSchema -) { - val MagicByte: Byte = 0x0 - val IdSize: Int = 4 - - val objectWriter = Jackson.newObjectMapper().writer() - - def jsonSchemaSerializer(isKey: Boolean): Serializer[F, A] = { - val mkSubject = subjectName(isKey) _ - - Serializer.instance[F, A] { (topic, _, data) => - val jsonPayload: JsonNode = circeToJackson(data.asJson) - val subject = mkSubject(topic) - - val fSchema: F[JsonSchema] = - if (!settings.automaticRegistration && settings.useLatestVersion) - lookupLatestVersion(subject, clientSchema, cache, settings.latestCompatStrict) - .map(_.asInstanceOf[JsonSchema]) - else Sync[F].pure(clientSchema) - - val fId: F[Int] = - if (settings.automaticRegistration) registerSchema(subject, clientSchema) - else if (settings.useLatestVersion) - lookupLatestVersion(subject, clientSchema, cache, settings.latestCompatStrict) - .flatMap(s => getId(subject, s.asInstanceOf[JsonSchema])) - else getId(subject, clientSchema) - - for { - schema <- fSchema - _ <- validatePayload(schema, jsonPayload) - id <- fId - bytes <- Sync[F].delay { - val payloadBytes = objectWriter.writeValueAsBytes(jsonPayload) - val baos = new ByteArrayOutputStream() - baos.write(MagicByte.toInt) - baos.write(ByteBuffer.allocate(IdSize).putInt(id).array()) - baos.write(payloadBytes) - val bytes = baos.toByteArray - baos.close() - bytes - } - } yield bytes - } - } - - private def validatePayload(schema: JsonSchema, jsonPayload: JsonNode): F[Unit] = - Sync[F].whenA(settings.validatePayload) { - Sync[F].delay(schema.validate(jsonPayload)) - } - - private def subjectName(isKey: Boolean)(topic: String): String = - if (isKey) s"$topic-key" else s"$topic-value" - - private def registerSchema(subject: String, jsonSchema: JsonSchema): F[Int] = - Sync[F].delay(client.register(subject, jsonSchema)) - - private def getId(subject: String, jsonSchema: JsonSchema): F[Int] = - Sync[F].delay(client.getId(subject, jsonSchema)) - - private def fetchLatest(subject: String): F[ParsedSchema] = - Sync[F] - .delay(client.getLatestSchemaMetadata(subject)) - .flatMap { metadata => - Sync[F].delay { - // This requires JSON support to be configured in the Schema Registry Client - client.parseSchema( - metadata.getSchemaType, - metadata.getSchema, - metadata.getReferences - ) - }.flatMap { - _.toScala match { - case Some(schema) => - Sync[F].pure(schema) - case None => - Sync[F].delay(new JsonSchema(metadata.getSchema).validate()) >> - // successfully parsed the schema locally means that the client was not properly configured - Sync[F].raiseError[ParsedSchema]( - new RuntimeException( - "Please enable JSON support in SchemaRegistryClientSettings by using withJsonSchemaSupport" - ) - ) - } - } - } - - private def lookupLatestVersion( - subject: String, - schema: ParsedSchema, - cache: Ref[F, Map[SubjectSchema, ParsedSchema]], - latestCompatStrict: Boolean - ): F[ParsedSchema] = { - val ss = SubjectSchema(subject, schema) - for { - map <- cache.get - result = map.get(ss) - latestVersion <- result match { - case Some(cached) => Sync[F].pure(cached) - case None => fetchLatest(subject) - } - // the latest version must be backwards compatible with the current schema - // this does not test forward compatibility to allow unions - compatIssues = latestVersion.isBackwardCompatible(schema) - _ <- Sync[F].whenA(latestCompatStrict && !compatIssues.isEmpty) { - Sync[F].raiseError( - new IOException(s"Incompatible schema: ${compatIssues.toArray.mkString(", ")}") - ) - } - _ <- Sync[F].whenA(result.isEmpty)(cache.update(_ + (ss -> latestVersion))) - } yield latestVersion - } -} + envelopeMode: Boolean + )(using p: Pickler[A], sync: Sync[F]): Resource[F, Serializer[F, A]] = + Resource + .make(sync.delay(KafkaJsonSchemaSerializer[JsonNode](client, confluentConfig)))(client => + sync.delay(client.close()) + ) + .evalTap: client => + sync.delay(client.configure(confluentConfig, isKey)) + .evalMap: underlying => + JsonSchemaSerializer(underlying, mapper, envelopeMode).serializer + +private class JsonSchemaSerializer[F[_], A]( + underlying: KafkaJsonSchemaSerializer[JsonNode], + mapper: ObjectMapper, + envelopeMode: Boolean +)(using + sync: Sync[F], + pickler: Pickler[A] +): + import pickler.innerUpickle.* + private given Writer[A] = pickler.innerUpickle.writer + + private val makeJsonSchema: F[JsonSchema] = + val tapirSchema = pickler.schema + val tapirJsonSchema = + TapirSchemaToJsonSchema(tapirSchema, markOptionsAsNullable = false, metaSchema = MetaSchemaDraft04) + sync.delay: + JsonSchema: + tapirJsonSchema.asJson.deepDropNullValues.noSpaces + + def serializer: F[Serializer[F, A]] = + makeJsonSchema + .map: jsonSchema => + Serializer + .delegate[F, JsonNode](underlying) + .contramap[A]: value => + val str = write(value) + val node = mapper.readValue(str, classOf[JsonNode]) + if envelopeMode then JsonSchemaUtils.envelope(jsonSchema, node) + else node + .suspend diff --git a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala index 264910d..f257f51 100644 --- a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala +++ b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala @@ -1,42 +1,120 @@ package io.kaizensolutions.jsonschema +import cats.effect.Resource +import cats.effect.Sync +import com.fasterxml.jackson.databind.ObjectMapper +import fs2.kafka.* +import io.confluent.kafka.schemaregistry.SchemaProvider +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider +import io.confluent.kafka.schemaregistry.json.SpecificationVersion +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.* +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig.* +import sttp.tapir.json.pickler.Pickler -object JsonSchemaSerializerSettings { +import java.util.Locale +import scala.jdk.CollectionConverters.* +import io.circe.generic.auto + +object JsonSchemaSerializerSettings: val default: JsonSchemaSerializerSettings = JsonSchemaSerializerSettings() -} - -/** - * Settings that describe how to interact with Confluent's Schema Registry when - * serializing data - * - * @param automaticRegistration - * dictates whether we try to automatically register the schema we have with - * the server - * @param useLatestVersion - * dictates whether to use the latest schema on the server instead of - * registering a new one - * @param validatePayload - * dictates whether to validate the JSON payload against the schema - * @param latestCompatStrict - * dictates whether to use strict compatibility - * @param jsonSchemaId - * is used to override the schema ID of the data that is being produced - */ + final case class JsonSchemaSerializerSettings( - automaticRegistration: Boolean = true, + schemaRegistryUrl: String = "http://localhost:8081", + autoRegisterSchema: Boolean = true, useLatestVersion: Boolean = false, - validatePayload: Boolean = false, - latestCompatStrict: Boolean = true, - jsonSchemaId: Option[String] = None -) { self => - def withSchemaId(id: String): JsonSchemaSerializerSettings = - self.copy(jsonSchemaId = Some(id)) + failOnUnknownProperties: Boolean = true, + failOnInvalidSchema: Boolean = false, + writeDatesAsISO8601: Boolean = false, + jsonSchemaSpec: JsonSchemaVersion = JsonSchemaVersion.default, + oneOfForNullables: Boolean = true, + jsonIndentOutput: Boolean = false, + envelopeMode: Boolean = true, + cacheCapacity: Int = 1024, + client: Option[SchemaRegistryClient] = None, + mapper: Option[ObjectMapper] = None +): + def withSchemaRegistryUrl(url: String): JsonSchemaSerializerSettings = + copy(schemaRegistryUrl = url) + + def withAutoRegisterSchema(b: Boolean): JsonSchemaSerializerSettings = + copy(autoRegisterSchema = b) def withUseLatestVersion(b: Boolean): JsonSchemaSerializerSettings = - self.copy(useLatestVersion = b) + copy(useLatestVersion = b) + + def withFailOnUnknownProperties(b: Boolean): JsonSchemaSerializerSettings = + copy(failOnUnknownProperties = b) + + def withFailOnInvalidSchema(b: Boolean): JsonSchemaSerializerSettings = + copy(failOnInvalidSchema = b) + + def withWriteDatesAsISO8601(b: Boolean): JsonSchemaSerializerSettings = + copy(writeDatesAsISO8601 = b) + + def withJsonSchemaSpec(spec: JsonSchemaVersion): JsonSchemaSerializerSettings = + copy(jsonSchemaSpec = spec) + + def withOneOfForNullables(b: Boolean): JsonSchemaSerializerSettings = + copy(oneOfForNullables = b) + + def withJsonIndentOutput(b: Boolean): JsonSchemaSerializerSettings = + copy(jsonIndentOutput = b) + + def withCacheCapacity(value: Int): JsonSchemaSerializerSettings = + copy(cacheCapacity = value) + + def withClient(client: SchemaRegistryClient): JsonSchemaSerializerSettings = + copy(client = Option(client)) + + def withMapper(mapper: ObjectMapper): JsonSchemaSerializerSettings = + copy(mapper = Option(mapper)) + + def forKey[F[_], A](using Pickler[A], Sync[F]): Resource[F, KeySerializer[F, A]] = + create(isKey = true) + + def forValue[F[_], A](using Pickler[A], Sync[F]): Resource[F, ValueSerializer[F, A]] = + create(isKey = false) + + private def config: Map[String, Any] = Map( + SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl, + AUTO_REGISTER_SCHEMAS -> autoRegisterSchema, + USE_LATEST_VERSION -> useLatestVersion, + FAIL_INVALID_SCHEMA -> failOnInvalidSchema, + FAIL_UNKNOWN_PROPERTIES -> failOnUnknownProperties, + WRITE_DATES_AS_ISO8601 -> writeDatesAsISO8601.toString, + SCHEMA_SPEC_VERSION -> jsonSchemaSpec.toConfluentConfig, + JSON_INDENT_OUTPUT -> jsonIndentOutput.toString + ) + + private def create[F[_], A](isKey: Boolean)(using p: Pickler[A], sync: Sync[F]): Resource[F, Serializer[F, A]] = + val configuredMapper = mapper.getOrElse(new ObjectMapper()) + val confluentConfig = config.asJava + client + .map(Resource.pure[F, SchemaRegistryClient]) + .getOrElse: + val providers: java.util.List[SchemaProvider] = List(new JsonSchemaProvider()).asJava + val acquire = sync.delay( + new CachedSchemaRegistryClient(schemaRegistryUrl, cacheCapacity, providers, confluentConfig) + ) + val release = (client: SchemaRegistryClient) => sync.delay(client.close()) + Resource.make(acquire)(release) + .flatMap: client => + JsonSchemaSerializer.create(isKey, confluentConfig, configuredMapper, client, envelopeMode) + +enum JsonSchemaVersion: + self => + case Draft4, Draft6, Draft7, Draft2019, Draft2020 - def withAutomaticRegistration(b: Boolean): JsonSchemaSerializerSettings = - self.copy(automaticRegistration = b) + def toConfluentConfig: String = + val confluentEnum = self match + case Draft4 => SpecificationVersion.DRAFT_4 + case Draft6 => SpecificationVersion.DRAFT_6 + case Draft7 => SpecificationVersion.DRAFT_7 + case Draft2019 => SpecificationVersion.DRAFT_2019_09 + case Draft2020 => SpecificationVersion.DRAFT_2020_12 + confluentEnum.name().toLowerCase(Locale.ROOT) - def withStrictLatestCompatibility(b: Boolean): JsonSchemaSerializerSettings = - self.copy(latestCompatStrict = b) -} +object JsonSchemaVersion: + // NOTE: This is what Tapir currently supports + val default: JsonSchemaVersion = Draft4 diff --git a/src/main/scala/io/kaizensolutions/jsonschema/package.scala b/src/main/scala/io/kaizensolutions/jsonschema/package.scala deleted file mode 100644 index 55378be..0000000 --- a/src/main/scala/io/kaizensolutions/jsonschema/package.scala +++ /dev/null @@ -1,44 +0,0 @@ -package io.kaizensolutions - -import cats.effect.Sync -import fs2.kafka.vulcan.SchemaRegistryClientSettings -import io.confluent.kafka.schemaregistry.SchemaProvider -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient -import io.confluent.kafka.schemaregistry.json.{JsonSchema, JsonSchemaProvider} - -import scala.jdk.CollectionConverters.* -import scala.reflect.ClassTag - -package object jsonschema { - implicit class ClientSchemaRegistrySyntax[F[_]: Sync](client: SchemaRegistryClientSettings[F]) { - def withJsonSchemaSupport: SchemaRegistryClientSettings[F] = - client.withCreateSchemaRegistryClient((baseUrl, maxCacheSize, properties) => - Sync[F].delay( - new CachedSchemaRegistryClient( - List(baseUrl).asJava, - maxCacheSize, - // Avro is present by default and we add JSON Schema support - List[SchemaProvider](new AvroSchemaProvider(), new JsonSchemaProvider()).asJava, - properties.asJava - ) - ) - ) - } - - def toJsonSchema[F[_]: Sync, T](schema: json.Schema[T], schemaId: Option[String] = None)(implicit - tag: ClassTag[T] - ): F[JsonSchema] = { - import com.github.andyglow.jsonschema.* - - Sync[F].delay { - val instance = new JsonSchema( - schema.draft07( - schemaId.getOrElse(tag.runtimeClass.getSimpleName.toLowerCase + "schema.json") - ) - ) - instance.validate() - instance - } - } -} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 1067b24..ee6b234 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -9,7 +9,7 @@ - + diff --git a/src/test/scala/io/kaizensolutions/jsonschema/JsonSchemaSerDesSpec.scala b/src/test/scala/io/kaizensolutions/jsonschema/JsonSchemaSerDesSpec.scala index 5b3d2c7..04ab647 100644 --- a/src/test/scala/io/kaizensolutions/jsonschema/JsonSchemaSerDesSpec.scala +++ b/src/test/scala/io/kaizensolutions/jsonschema/JsonSchemaSerDesSpec.scala @@ -3,29 +3,31 @@ package io.kaizensolutions.jsonschema import cats.effect.* import cats.syntax.all.* import com.dimafeng.testcontainers.DockerComposeContainer.ComposeFile -import com.dimafeng.testcontainers.munit.TestContainersForAll import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService} +import com.dimafeng.testcontainers.munit.TestContainersForAll import fs2.Stream import fs2.kafka.* -import fs2.kafka.vulcan.SchemaRegistryClientSettings -import io.circe.generic.semiauto.* -import io.circe.{Codec, Decoder, Encoder} -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException -import json.schema.description -import _root_.json.{Json, Schema} +import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient} +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider +import io.confluent.kafka.schemaregistry.{CompatibilityLevel, SchemaProvider} import munit.CatsEffectSuite +import org.apache.kafka.common.errors.{ + InvalidConfigurationException, + SerializationException as UnderlyingSerializationException +} +import sttp.tapir.Schema.annotations.* +import sttp.tapir.json.pickler.Pickler -import java.io.{File, IOException} +import java.io.File import scala.concurrent.duration.DurationInt -import scala.reflect.ClassTag +import scala.jdk.CollectionConverters.* -class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { +class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll: test( "JsonSchemaSerialization will automatically register the JSON Schema and allow you to send JSON data to Kafka" ) { val examplePersons = List.fill(100)(PersonV1("Bob", 40, List(Book("Bob the builder", 1337)))) - val serSettings = JsonSchemaSerializerSettings.default.withAutomaticRegistration(true) + val serSettings = JsonSchemaSerializerSettings.default producerTest[IO, PersonV1]( schemaRegistry[IO], serSettings, @@ -38,7 +40,7 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { test("Enabling use latest (and disabling auto-registration) without configuring the client will fail") { val examplePersons = List.fill(100)(PersonV1("Bob", 40, List(Book("Bob the builder", 1337)))) - val serSettings = JsonSchemaSerializerSettings.default.withAutomaticRegistration(false).withUseLatestVersion(true) + val serSettings = JsonSchemaSerializerSettings.default.withAutoRegisterSchema(false).withUseLatestVersion(true) producerTest[IO, PersonV1]( noJsonSupportSchemaRegistry[IO], serSettings, @@ -46,17 +48,16 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { examplePersons, _.name, result => - interceptMessageIO[RuntimeException]( - "Please enable JSON support in SchemaRegistryClientSettings by using withJsonSchemaSupport" - )(result) + interceptIO[RuntimeException](result) + .map: t => + assert(t.getCause.getMessage.contains("Invalid schema")) ) } test("Attempting to publish an incompatible change with auto-registration will fail") { val settings = JsonSchemaSerializerSettings.default - .withAutomaticRegistration(true) - .withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json") + .withAutoRegisterSchema(true) val topic = "example-topic-persons" @@ -70,7 +71,7 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { examplePersons, _.name, result => - interceptIO[RestClientException](result) + interceptIO[InvalidConfigurationException](result) .map(_.getMessage.startsWith("Schema being registered is incompatible with an earlier schema")) ) } @@ -80,9 +81,8 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { ) { val settings = JsonSchemaSerializerSettings.default - .withAutomaticRegistration(false) + .withAutoRegisterSchema(false) .withUseLatestVersion(true) - .withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json") val topic = "example-topic-persons" @@ -95,7 +95,12 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { topic, examplePersons, _.name, - result => interceptIO[IOException](result).map(_.getMessage.startsWith("Incompatible schema")) + result => + interceptIO[UnderlyingSerializationException](result).map: t => + val message = t.getCause.getMessage + message.startsWith("Incompatible schema") && message.endsWith( + "Set latest.compatibility.strict=false to disable this check" + ) ) } @@ -104,9 +109,7 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { ) { val settings = JsonSchemaSerializerSettings.default - .withAutomaticRegistration(false) - .withUseLatestVersion(false) - .withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json") + .withAutoRegisterSchema(false) val topic = "example-topic-persons" @@ -120,17 +123,16 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { examplePersons, _.name, result => - interceptMessageIO[RestClientException]( - """Schema not found; error code: 40403""" - )(result) + interceptIO[UnderlyingSerializationException](result) + .map: exception => + exception.getCause.getMessage.contains("Schema not found; error code: 40403") ) } - test("Publishing a compatible change with auto-registration is allowed") { + test("Publishing a forward compatible change with auto-registration is allowed (in forward-compatibility mode)") { val settings = JsonSchemaSerializerSettings.default - .withAutomaticRegistration(true) - .withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json") + .withAutoRegisterSchema(true) val topic = "example-topic-persons" @@ -146,12 +148,13 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { ) producerTest[IO, PersonV2Good]( - schemaRegistry[IO], - settings, - topic, - examplePersons, - _.name, - result => assertIO(result, examplePersons) + compatibilityMode = Option(CompatibilityLevel.FORWARD), + client = schemaRegistry[IO], + settings = settings, + topic = topic, + input = examplePersons, + key = _.name, + assertion = result => assertIO(result, examplePersons) ) } @@ -159,8 +162,6 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { "Reading data back from the topic with the latest schema is allowed provided you compensate for missing fields in your Decoder" ) { val settings = JsonSchemaDeserializerSettings.default - .withJsonSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json") - .withAggressiveValidation(true) val result: IO[(Boolean, Boolean)] = consumeFromKafka[IO, PersonV2Good]( @@ -182,8 +183,6 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { test("Reading data back from the topic with an older schema is allowed") { val settings = JsonSchemaDeserializerSettings.default - .withJsonSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json") - .withPayloadValidationAgainstServerSchema(true) val result: IO[Long] = consumeFromKafka[IO, PersonV1]( @@ -197,19 +196,28 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { assertIO(result, 200L) } - def producerTest[F[_]: Async, A: Encoder: json.Schema: ClassTag]( - fClient: F[SchemaRegistryClient], + def producerTest[F[_]: Async, A: Pickler]( + client: Resource[F, SchemaRegistryClient], settings: JsonSchemaSerializerSettings, topic: String, input: List[A], key: A => String, - assertion: F[List[A]] => F[Any] + assertion: F[List[A]] => F[Any], + compatibilityMode: Option[CompatibilityLevel] = None ): F[Any] = { val produceElements: F[List[A]] = Stream - .eval[F, SchemaRegistryClient](fClient) - .evalMap(JsonSchemaSerializer.forValue[F, A](settings, _)) - .flatMap(implicit serializer => kafkaProducer[F, String, A]) + .resource[F, SchemaRegistryClient](client) + .evalTap: client => + compatibilityMode.fold(ifEmpty = ().pure[F]): newCompat => + val subject = s"$topic-value" // change accordingly if you use isKey + Async[F].delay: + client.updateCompatibility(subject, newCompat.name) + .map(settings.withClient) + .flatMap(settings => Stream.resource(settings.forValue[F, A])) + .flatMap: serializer => + given ValueSerializer[F, A] = serializer + kafkaProducer[F, String, A] .flatMap { kafkaProducer => Stream .emits[F, A](input) @@ -231,17 +239,20 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { assertion(produceElements) } - def consumeFromKafka[F[_]: Async, A: Decoder: json.Schema: ClassTag]( - fClient: F[SchemaRegistryClient], + def consumeFromKafka[F[_]: Async, A: Pickler]( + client: Resource[F, SchemaRegistryClient], settings: JsonSchemaDeserializerSettings, groupId: String, topic: String, numberOfElements: Long ): Stream[F, A] = Stream - .eval(fClient) - .evalMap(client => JsonSchemaDeserializer.forValue[F, A](settings, client)) - .flatMap(implicit des => kafkaConsumer[F, Option[String], A](groupId)) + .resource(client) + .map(settings.withClient) + .flatMap(settings => Stream.resource(settings.forValue[F, A])) + .flatMap: des => + given ValueDeserializer[F, A] = des + kafkaConsumer[F, Option[String], A](groupId) .evalTap(_.subscribeTo(topic)) .flatMap(_.stream) .map(_.record.value) @@ -260,7 +271,7 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { ) .start() - def kafkaProducer[F[_]: Async, K, V](implicit + def kafkaProducer[F[_]: Async, K, V](using keySerializer: KeySerializer[F, K], valueSerializer: ValueSerializer[F, V] ): Stream[F, KafkaProducer[F, K, V]] = { @@ -280,64 +291,51 @@ class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll { KafkaConsumer.stream(settings) } - def schemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] = - SchemaRegistryClientSettings("http://localhost:8081").withJsonSchemaSupport.createSchemaRegistryClient + def schemaRegistry[F[_]](using sync: Sync[F]): Resource[F, SchemaRegistryClient] = + val providers: java.util.List[SchemaProvider] = List(new JsonSchemaProvider()).asJava + val acquire = Sync[F].delay: + new CachedSchemaRegistryClient("http://localhost:8081", 1024, providers, Map.empty.asJava) + val release = (client: SchemaRegistryClient) => sync.delay(client.close()) + Resource.make(acquire)(release) - def noJsonSupportSchemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] = - SchemaRegistryClientSettings("http://localhost:8081").createSchemaRegistryClient -} + def noJsonSupportSchemaRegistry[F[_]](using sync: Sync[F]): Resource[F, SchemaRegistryClient] = + Resource.make( + Sync[F].delay: + new CachedSchemaRegistryClient("http://localhost:8081", 1024) + )(client => sync.delay(client.close())) -object Book { - implicit val bookJsonSchema: Schema[Book] = Json.schema[Book] - implicit val bookCodec: Codec[Book] = deriveCodec[Book] -} final case class Book( @description("name of the book") name: String, @description("international standard book number") isbn: Int ) +object Book: + given Pickler[Book] = Pickler.derived -object PersonV1 { - implicit val personJsonSchema: Schema[PersonV1] = Json.schema[PersonV1] - implicit val personCodec: Codec[PersonV1] = deriveCodec[PersonV1] -} final case class PersonV1( @description("name of the person") name: String, @description("age of the person") age: Int, @description("A list of books that the person has read") books: List[Book] ) +object PersonV1: + given Pickler[PersonV1] = Pickler.derived // V2 is backwards incompatible with V1 because the key has changed -object PersonV2Bad { - implicit val personV2BadJsonSchema: Schema[PersonV2Bad] = Json.schema[PersonV2Bad] - implicit val personV2BadCodec: Codec[PersonV2Bad] = deriveCodec[PersonV2Bad] -} final case class PersonV2Bad( @description("name of the person") name: String, @description("age of the person") age: Int, @description("A list of books that the person has read") booksRead: List[Book] ) +object PersonV2Bad: + given Pickler[PersonV2Bad] = Pickler.derived -object PersonV2Good { - implicit val personV2GoodJsonSchema: Schema[PersonV2Good] = Json.schema[PersonV2Good] - implicit val personV2GoodCodec: Codec[PersonV2Good] = { - val encoder: Encoder[PersonV2Good] = deriveEncoder[PersonV2Good] - - val decoder: Decoder[PersonV2Good] = cursor => - for { - name <- cursor.downField("name").as[String] - age <- cursor.downField("age").as[Int] - books <- cursor.downField("books").as[List[Book]] - hobbies <- cursor.downField("hobbies").as[Option[List[String]]] // account for missing hobbies - optField <- cursor.downField("optionalField").as[Option[String]] - } yield PersonV2Good(name, age, books, hobbies.getOrElse(Nil), optField) - - Codec.from(decoder, encoder) - } -} final case class PersonV2Good( @description("name of the person") name: String, @description("age of the person") age: Int, @description("A list of books that the person has read") books: List[Book], - @description("A list of hobbies") hobbies: List[String] = Nil, - @description("An optional field to add extra information") optionalField: Option[String] + @description("A list of hobbies") @default(Nil) + hobbies: List[String], + @description("An optional field to add extra information") @default(Option.empty[String]) + optionalField: Option[String] ) +object PersonV2Good: + given Pickler[PersonV2Good] = Pickler.derived