Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinlfer committed Jun 30, 2021
1 parent bb8b8b6 commit 4066bc3
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 3 deletions.
144 changes: 143 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,148 @@ This functionality is backed by the following libraries:
- [fs2-kafka & fs2-kafka-vulcan](https://github.com/fd4s/fs2-kafka) which provides the serializers and deserializers interfaces that we implement along with the Schema Registry client that we enrich
- [confluent-schema-registry](https://github.com/confluentinc/schema-registry) is used as a basis for implementation and small portions are used for JSON Schema validation

### Usage ###

1. Define your data-types
```scala
object Book {}
final case class Book(
name: String,
isbn: Int
)

object Person {}
final case class PersonV1(
name: String,
age: Int,
books: List[Book]
)
```

2. Derive JSON Schemas for your case classes and add extra JSON Schema information using `scala-jsonschema`
```scala
import json.schema.description
import json.{Json, Schema}

object Book {
implicit val bookJsonSchema: Schema[Book] = Json.schema[Book]
}
final case class Book(
@description("name of the book") name: String,
@description("international standard book number") isbn: Int
)

object Person {
implicit val personJsonSchema: Schema[Person] = Json.schema[Person]
}
final case class Person(
@description("name of the person") name: String,
@description("age of the person") age: Int,
@description("A list of books that the person has read") books: List[Book]
)
```

3. Use `circe` to derive Encoders & Decoders (or Codecs) for your data-types:
```scala
import io.circe.generic.semiauto._
import io.circe.Codec
import json.schema.description
import json.{Json, Schema}

object Book {
implicit val bookJsonSchema: Schema[Book] = Json.schema[Book]
implicit val bookCodec: Codec[Book] = deriveCodec[Book]
}
final case class Book(
@description("name of the book") name: String,
@description("international standard book number") isbn: Int
)

object Person {
implicit val personJsonSchema: Schema[Person] = Json.schema[Person]
implicit val personCodec: Codec[Person] = deriveCodec[Person]
}
final case class Person(
@description("name of the person") name: String,
@description("age of the person") age: Int,
@description("A list of books that the person has read") books: List[Book]
)
```

4. Instantiate and configure the Schema Registry
```scala
import cats.effect._
import io.kaizensolutions.jsonschema._

def schemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] =
SchemaRegistryClientSettings("http://localhost:8081")
.withJsonSchemaSupport
.createSchemaRegistryClient
```

5. Configure your FS2 Kafka Producers and Consumers to pull Serializers (or do this process manually)
```scala
import cats.effect._
import fs2.Stream
import fs2.kafka._

def kafkaProducer[F[_]: Async, K, V](implicit
keySerializer: Serializer[F, K],
valueSerializer: Serializer[F, V]
): Stream[F, KafkaProducer[F, K, V]] = {
val settings: ProducerSettings[F, K, V] =
ProducerSettings[F, K, V].withBootstrapServers("localhost:9092")
KafkaProducer.stream(settings)
}

def kafkaConsumer[F[_]: Async, K, V](groupId: String)(implicit
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
): Stream[F, KafkaConsumer[F, K, V]] = {
val settings = ConsumerSettings[F, K, V]
.withBootstrapServers("localhost:9092")
.withGroupId(groupId)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
KafkaConsumer.stream(settings)
}
```
**Note:** In some cases you will need to adjust the Decoder to account for missing data

6. Produce data to Kafka with automatic Confluent Schema Registry support:
```scala
import cats.effect._
import fs2._
import fs2.kafka._
import json._
import io.circe._
import io.kaizensolutions.jsonschema._
import scala.reflect.ClassTag

def jsonSchemaProducer[F[_]: Async, A: Encoder: json.Schema: ClassTag](
settings: JsonSchemaSerializerSettings
): Stream[F, KafkaProducer[F, String, A]] =
Stream
.eval[F, SchemaRegistryClient](schemaRegistry[F])
.evalMap(schemaRegistryClient => JsonSchemaSerializer[F, A](settings, schemaRegistryClient))
.evalMap(_.forValue)
.flatMap(implicit serializer => kafkaProducer[F, String, A])


def jsonSchemaConsumer[F[_]: Async, A: Decoder: json.Schema: ClassTag](
settings: JsonSchemaDeserializerSettings,
groupId: String
): Stream[F, KafkaConsumer[F, String, A]] =
Stream
.eval(schemaRegistry[F])
.evalMap(client => JsonSchemaDeserializer[F, A](settings, client))
.flatMap(implicit des => kafkaConsumer[F, String, A](groupId))
```

### Settings ###
There are a number of settings that control a number of behaviors when it comes to serialization and deserialization of data.
Please check `JsonSchemaDeserializerSettings` and `JsonSchemaSerializerSettings` for more information. The `default` settings
work great unless you need fine-grained control

### Notes ###
- Please note that this is only an initial design to prove the functionality and I'm very happy to integrate this back into FS2 Kafka (and other Kafka libraries) so please submit an issue and we can take it from there
- Please note that this is only an initial design to prove the functionality, and I'm very happy to integrate this back into FS2 Kafka (and other Kafka libraries) so please submit an issue and we can take it from there
- This library provides additional validation checks for the Deserialization side on top of what Confluent provides in their Java JSON Schema Deserializer
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@ package io.kaizensolutions.jsonschema
object JsonSchemaDeserializerSettings {
val default: JsonSchemaDeserializerSettings = JsonSchemaDeserializerSettings()
}

/**
* Settings that describe how to interact with Confluent's Schema Registry when deserializing data
*
* @param validatePayloadAgainstServerSchema will validate the payload against the schema on the server
* @param validatePayloadAgainstClientSchema will validate the payload against the schema derived from the datatype you specify
* @param validateClientSchemaAgainstServer will validate the schema you specify against the server's schema
* @param failOnUnknownKeys will specify failure when unknown JSON keys are encountered
* @param jsonSchemaId is used to override the schema ID of the data that is being consumed
*/
final case class JsonSchemaDeserializerSettings(
validatePayloadAgainstServerSchema: Boolean = false,
validatePayloadAgainstClientSchema: Boolean = false,
validateClientSchemaAgainstServer: Boolean = false,
failOnUnknownKeys: Boolean = false,
jsonSchemaId: Option[String] = None
) { self =>
) { self =>
def withPayloadValidationAgainstServerSchema(b: Boolean): JsonSchemaDeserializerSettings =
self.copy(validatePayloadAgainstServerSchema = b)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@ package io.kaizensolutions.jsonschema
object JsonSchemaSerializerSettings {
val default: JsonSchemaSerializerSettings = JsonSchemaSerializerSettings()
}

/**
* Settings that describe how to interact with Confluent's Schema Registry when serializing data
*
* @param automaticRegistration dictates whether we try to automatically register the schema we have with the server
* @param useLatestVersion dictates whether to use the latest schema on the server instead of registering a new one
* @param validatePayload dictates whether to validate the JSON payload against the schema
* @param latestCompatStrict dictates whether to use strict compatibility
* @param jsonSchemaId is used to override the schema ID of the data that is being produced
*/
final case class JsonSchemaSerializerSettings(
automaticRegistration: Boolean = true,
useLatestVersion: Boolean = false,
validatePayload: Boolean = false,
latestCompatStrict: Boolean = true,
jsonSchemaId: Option[String] = None
) { self =>
) { self =>
def withSchemaId(id: String): JsonSchemaSerializerSettings =
self.copy(jsonSchemaId = Some(id))

Expand Down

0 comments on commit 4066bc3

Please sign in to comment.