diff --git a/README.md b/README.md index 00eeb3f..1dc55ce 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,148 @@ This functionality is backed by the following libraries: - [fs2-kafka & fs2-kafka-vulcan](https://github.com/fd4s/fs2-kafka) which provides the serializers and deserializers interfaces that we implement along with the Schema Registry client that we enrich - [confluent-schema-registry](https://github.com/confluentinc/schema-registry) is used as a basis for implementation and small portions are used for JSON Schema validation +### Usage ### + +1. Define your data-types +```scala +object Book {} +final case class Book( + name: String, + isbn: Int +) + +object Person {} +final case class PersonV1( + name: String, + age: Int, + books: List[Book] +) +``` + +2. Derive JSON Schemas for your case classes and add extra JSON Schema information using `scala-jsonschema` +```scala +import json.schema.description +import json.{Json, Schema} + +object Book { + implicit val bookJsonSchema: Schema[Book] = Json.schema[Book] +} +final case class Book( + @description("name of the book") name: String, + @description("international standard book number") isbn: Int +) + +object Person { + implicit val personJsonSchema: Schema[Person] = Json.schema[Person] +} +final case class Person( + @description("name of the person") name: String, + @description("age of the person") age: Int, + @description("A list of books that the person has read") books: List[Book] +) +``` + +3. Use `circe` to derive Encoders & Decoders (or Codecs) for your data-types: +```scala +import io.circe.generic.semiauto._ +import io.circe.Codec +import json.schema.description +import json.{Json, Schema} + +object Book { + implicit val bookJsonSchema: Schema[Book] = Json.schema[Book] + implicit val bookCodec: Codec[Book] = deriveCodec[Book] +} +final case class Book( + @description("name of the book") name: String, + @description("international standard book number") isbn: Int +) + +object Person { + implicit val personJsonSchema: Schema[Person] = Json.schema[Person] + implicit val personCodec: Codec[Person] = deriveCodec[Person] +} +final case class Person( + @description("name of the person") name: String, + @description("age of the person") age: Int, + @description("A list of books that the person has read") books: List[Book] +) +``` + +4. Instantiate and configure the Schema Registry +```scala +import cats.effect._ +import io.kaizensolutions.jsonschema._ + +def schemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] = + SchemaRegistryClientSettings("http://localhost:8081") + .withJsonSchemaSupport + .createSchemaRegistryClient +``` + +5. Configure your FS2 Kafka Producers and Consumers to pull Serializers (or do this process manually) +```scala +import cats.effect._ +import fs2.Stream +import fs2.kafka._ + +def kafkaProducer[F[_]: Async, K, V](implicit + keySerializer: Serializer[F, K], + valueSerializer: Serializer[F, V] +): Stream[F, KafkaProducer[F, K, V]] = { + val settings: ProducerSettings[F, K, V] = + ProducerSettings[F, K, V].withBootstrapServers("localhost:9092") + KafkaProducer.stream(settings) +} + +def kafkaConsumer[F[_]: Async, K, V](groupId: String)(implicit + keyDeserializer: Deserializer[F, K], + valueDeserializer: Deserializer[F, V] +): Stream[F, KafkaConsumer[F, K, V]] = { + val settings = ConsumerSettings[F, K, V] + .withBootstrapServers("localhost:9092") + .withGroupId(groupId) + .withAutoOffsetReset(AutoOffsetReset.Earliest) + KafkaConsumer.stream(settings) +} +``` +**Note:** In some cases you will need to adjust the Decoder to account for missing data + +6. Produce data to Kafka with automatic Confluent Schema Registry support: +```scala +import cats.effect._ +import fs2._ +import fs2.kafka._ +import json._ +import io.circe._ +import io.kaizensolutions.jsonschema._ +import scala.reflect.ClassTag + +def jsonSchemaProducer[F[_]: Async, A: Encoder: json.Schema: ClassTag]( + settings: JsonSchemaSerializerSettings +): Stream[F, KafkaProducer[F, String, A]] = + Stream + .eval[F, SchemaRegistryClient](schemaRegistry[F]) + .evalMap(schemaRegistryClient => JsonSchemaSerializer[F, A](settings, schemaRegistryClient)) + .evalMap(_.forValue) + .flatMap(implicit serializer => kafkaProducer[F, String, A]) + + +def jsonSchemaConsumer[F[_]: Async, A: Decoder: json.Schema: ClassTag]( + settings: JsonSchemaDeserializerSettings, + groupId: String +): Stream[F, KafkaConsumer[F, String, A]] = + Stream + .eval(schemaRegistry[F]) + .evalMap(client => JsonSchemaDeserializer[F, A](settings, client)) + .flatMap(implicit des => kafkaConsumer[F, String, A](groupId)) +``` + +### Settings ### +There are a number of settings that control a number of behaviors when it comes to serialization and deserialization of data. +Please check `JsonSchemaDeserializerSettings` and `JsonSchemaSerializerSettings` for more information. The `default` settings +work great unless you need fine-grained control + ### Notes ### -- Please note that this is only an initial design to prove the functionality and I'm very happy to integrate this back into FS2 Kafka (and other Kafka libraries) so please submit an issue and we can take it from there +- Please note that this is only an initial design to prove the functionality, and I'm very happy to integrate this back into FS2 Kafka (and other Kafka libraries) so please submit an issue and we can take it from there - This library provides additional validation checks for the Deserialization side on top of what Confluent provides in their Java JSON Schema Deserializer diff --git a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala index ee6a6fa..c106042 100644 --- a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala +++ b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala @@ -3,13 +3,23 @@ package io.kaizensolutions.jsonschema object JsonSchemaDeserializerSettings { val default: JsonSchemaDeserializerSettings = JsonSchemaDeserializerSettings() } + +/** + * Settings that describe how to interact with Confluent's Schema Registry when deserializing data + * + * @param validatePayloadAgainstServerSchema will validate the payload against the schema on the server + * @param validatePayloadAgainstClientSchema will validate the payload against the schema derived from the datatype you specify + * @param validateClientSchemaAgainstServer will validate the schema you specify against the server's schema + * @param failOnUnknownKeys will specify failure when unknown JSON keys are encountered + * @param jsonSchemaId is used to override the schema ID of the data that is being consumed + */ final case class JsonSchemaDeserializerSettings( validatePayloadAgainstServerSchema: Boolean = false, validatePayloadAgainstClientSchema: Boolean = false, validateClientSchemaAgainstServer: Boolean = false, failOnUnknownKeys: Boolean = false, jsonSchemaId: Option[String] = None -) { self => +) { self => def withPayloadValidationAgainstServerSchema(b: Boolean): JsonSchemaDeserializerSettings = self.copy(validatePayloadAgainstServerSchema = b) diff --git a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala index 6b04aa4..f1a963f 100644 --- a/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala +++ b/src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala @@ -3,13 +3,23 @@ package io.kaizensolutions.jsonschema object JsonSchemaSerializerSettings { val default: JsonSchemaSerializerSettings = JsonSchemaSerializerSettings() } + +/** + * Settings that describe how to interact with Confluent's Schema Registry when serializing data + * + * @param automaticRegistration dictates whether we try to automatically register the schema we have with the server + * @param useLatestVersion dictates whether to use the latest schema on the server instead of registering a new one + * @param validatePayload dictates whether to validate the JSON payload against the schema + * @param latestCompatStrict dictates whether to use strict compatibility + * @param jsonSchemaId is used to override the schema ID of the data that is being produced + */ final case class JsonSchemaSerializerSettings( automaticRegistration: Boolean = true, useLatestVersion: Boolean = false, validatePayload: Boolean = false, latestCompatStrict: Boolean = true, jsonSchemaId: Option[String] = None -) { self => +) { self => def withSchemaId(id: String): JsonSchemaSerializerSettings = self.copy(jsonSchemaId = Some(id))