Testing Schema Registry: Spring Boot and Kafka

Introduction

Apache Kafka is the most widely used reliable and scalable eventing platform. It helps in providing the base for a fast, scalable, fault-tolerant, event-driven microservice architecture. Any type of data can be published into a Kafka topic and can be read from. Kafka does not provide an out-of-the-box schema validation system and hence is prone to junk data being fed into a topic.

Confluent has devised a concept of a schema registry that can be used to implement type checking of messages in any Kafka installation. The Schema Registry needs the message schema to be registered against a topic and it enforces that only messages conforming to the schema are sent to the topic.

Integrating Schema Registry is not so much of a challenge with a Spring Boot Kafka producer and consumer apps with extensive support from Spring Boot and Apache Kafka. However, when it comes to testing, this often poses some challenges to the developer.

Since JSON is the most widespread data format, in this post I describe how to write tests for the Schema Registry for Apache Kafka using Spring Boot, MockSchemaRegistryClientand EmbeddedKafka using JSON Schema.

Install Dependencies

Add the confluent kafka-json-schema-serializer from Confluent dependency in pom.xml.

<project>
    ...

    <repositories>
        <repository>
            <id>confluent</id>
            <name>confluent</name>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-schema-serializer</artifactId>
            <version>6.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    ...
</project>

Setting Up the Test Infrastructure

To test the schema registry, we won’t require a full-fledged production instance of Kafka. Rather, we will use an embedded version of Kafka. To set up Embedded Kafka, we need to annotate any @SpringBootTest class or any @TestConfiguration class with @EmbeddedKafka Providing required parameters as shown below:

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:8092", "port=8092"})
@ActiveProfiles("test")
@DirtiesContext
public class KafkaTestSupport {


}

io.confluent:kafka-json-schema-serializer provides a mock implementation of Schema Registry client called MockSchemaRegistryClient that can be used to register and test out JSON schema. The io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer should be used as a value serializer for JSON schema-based events.

To set up a mock schema-registry server, the producer properties should be set as follows in application.yaml in your test resources.

spring:
  kafka:
    bootstrap-servers: localhost:8092 # should match with embedded broker host port
    producer:
      value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
    properties:
      schema.registry.url: mock://not-used
      auto.register.schemas: false
      use.latest.version: true
      json:
        fail.invalid.schema: true
        oneof.for.nullables: false
        write.dates.iso8601: true

A couple of points to note here:

  1. The mock Schema Registry URL must start with mock://.
  2. We are going to set auto.register.schema to false, since we will be populating and registering our JSON schema with the mock registry server beforehand. Setting this to false will disallow auto-creation and registration of the schema using Jackson.

Let’s consider our event model SampleEvent as follows:

public class SampleEvent {

    @JsonProperty(required = true)
    private Long id;

    private String name;

    @JsonProperty(required = true)
    private OffsetDateTime date;

    @JsonProperty(required = true)
    private List<SampleSubEvent> subEvents;

}

public class SampleSubEvent {

    @JsonProperty(required = true)
    private Long id;

    private String name;

}

Writing the Test Cases

As part of the configuration, we need to provide a SchemaRegistryClient bean of type MockSchemaRegistryClient and register the serializer to use this mock schema registry instead of an actual one. The ProducerFactory should also be registered to use this serializer. We will do these using a test configuration static inner class.

class SchemaRegistryTest extends KafkaTestSupport {


    @TestConfiguration
    public static class TestConfig<K, V> {

        @Autowired
        KafkaProperties properties;

        @Bean
        public SchemaRegistryClient schemaRegistryClient() throws RestClientException, IOException {
            SchemaProvider provider = new JsonSchemaProvider();
            return new MockSchemaRegistryClient(Collections.singletonList(provider));
        }

        @Bean
        public KafkaJsonSchemaSerializer<V> kafkaJsonSchemaSerializer(SchemaRegistryClient schemaRegistryClient)
                throws RestClientException, IOException {
            return new KafkaJsonSchemaSerializer<V>(schemaRegistryClient);
        }

        @Bean
        public ProducerFactory<K, V> producerFactory(Serializer<V> vSerializer) {
            return new DefaultKafkaProducerFactory<K, V>(properties.buildProducerProperties(), (Serializer<K>) new StringSerializer(),
                    vSerializer);
        }

    }

}

Once our mock Schema Registry has been set up, the next step is to generate a valid schema for the event model and register the schema with the mock Schema Registry. Following the default subject naming strategy, for a topic called testthe corresponding subject name will be test-value. For more details on the topics and subjects and how the subject naming strategies work, I would suggest you read this documentation from Confluent.

class SchemaRegistryTest extends KafkaTestSupport {

    private static final String TOPIC = "test-";

    private static final String SUBJECT = "test-value";

    private static final String SCHEMA_STR = "{"$schema":"http://json-schema.org/draft-07/schema#","title":"Sample Event","type":"object","additionalProperties":true,"properties":{"id":{"type":"integer"},"name":{"type":"string"},"date":{"type":"number"},"subEvents":{"type":"array","items":{"$ref":"#/definitions/SampleSubEvent"}}},"required":["id","date","subEvents"],"definitions":{"SampleSubEvent":{"type":"object","additionalProperties":true,"properties":{"id":{"type":"integer"},"name":{"type":"string"}},"required":["id"]}}}";

    private static final String SOME_INVALID_SCHEMA_STRING = "some_junk";


    @Autowired
    private KafkaTemplate<String, SampleEvent> template;

    @Autowired
    private SchemaRegistryClient schemaRegistryClient;


    @Test
    void should_publish_successfully_on_valid_schema() throws RestClientException, IOException {

        // parse and register the schema
        Optional<ParsedSchema> parsedSchema = schemaRegistryClient.parseSchema("JSON",SCHEMA_STR,
                Collections.EMPTY_LIST);

        if(parsedSchema.isPresent()){
            ParsedSchema schema = parsedSchema.get();
            schemaRegistryClient.register(SUBJECT, schema,1,1);
        }

        SampleEvent e = new SampleEvent();

        // send and assert that no exceptions are thrown
        Assertions.assertDoesNotThrow(() -> template.send(TEST_TOPIC, e));
    }


    @TestConfiguration
    public static class TestConfig<K, V> {

        ...

    }

}

Similarly, to test that an exception is thrown, register a wrong schema or try publishing a different event.

    @Test
    void should_throw_error_on_invalid_schema() throws RestClientException, IOException {

        Optional<ParsedSchema> parsedSchema = schemaRegistryClient.parseSchema("JSON", SOME_INVALID_SCHEMA_STRING,
                Collections.EMPTY_LIST);

        if(parsedSchema.isPresent()){
            ParsedSchema schema = parsedSchema.get();
            schemaRegistryClient.register(SUBJECT, schema,1,1);
        }

        SampleEvent e = new SampleEvent();


        // send and assert that exceptions are thrown
        Assertions.assertThrows(Exception.class,
                () -> template.send(TEST_TOPIC, e));
    }

And voila!!!

I hope that this article will be a good and helpful resource on testing Kafka Schema Registry implementation involving JSON schema.

.

Leave a Comment