Apache Kafka Essentials – DZone Refcardz

Apache Kafka runs in distributed clusters, with each cluster node being referred to as a Broker. Kafka Connect integrates Kafka instances on Brokers with producers and consumers — clients that produce and consume event data, respectively. All these components rely on the publish-subscribe durable messaging ecosystem to enable instant exchange of event data between servers, processes, and applications.

Pub/Sub in Apache Kafka

The first component in Kafka deals with the production and consumption of the data. The following table describes a few key concepts in Kafka:

Topic

Defines a logical name for producing and consuming records

Partition

Defines a non-overlapping subset of records within a topic

Offset

A unique sequential number assigned to each record within a topic partition

Record

Contains a key, value, timestamp, and list of headers

Broker

Server where records are stored; multiple brokers can be used to form a cluster

Figure 2 depicts a topic with two partitions. Partition 0 has 5 records, with offsets from 0 to 4, and partition 1 has 4 records, with offsets from 0 to 3.

Figure 2: Partitions in a topic

The following code snippet shows how to produce records to the topic, "test"using the Java API:

Properties props = new Properties();
   props.put(“bootstrap.servers”,
      “localhost:9092”);
props.put(“key.serializer”,

   “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”,

   “org.apache.kafka.common.serialization.StringSerializer”);
Producer<String, String> producer = new
    KafkaProducer<>(props);
producer.send(
    new ProducerRecord<String, String>(“test”, “key”, “value”));

In the example above, both the key and value are strings, so we are using a StringSerializer. It’s possible to customize the serializer when types become more complex. The following code snippet shows how to consume records with key and value strings in Java:

Properties props = new Properties();  props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.deserializer”, 

   “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”,

   “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer =
   new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“test”));
while (true) {
    ConsumerRecords<String, String> records =
      consumer.poll(100);
   for (ConsumerRecord<String, String> record : records)
      System.out.printf(“offset=%d, key=%s, value=%s”,
         record.offset(), record.key(), record.value());
   consumer.commitSync();
}

Records within a partition are always delivered to the consumer in offset order. By saving the offset of the last consumed record from each partition, the consumer can resume from where it is left off after a restart. In the example above, we use the commitSync() API to save the offsets explicitly after consuming a batch of records. Users can also save the offsets automatically by setting the property, enable.auto.committo true.

A record in Kafka is not removed from the broker immediately after it is consumed. Instead, it is retained according to a configured retention policy. The following are two common retention policies:

  • log.retention.hours – number of hours to keep a record on the broker
  • log.retention.bytes – maximum size of records retained in a partition

Kafka Connect

The second component is Kafka Connect, which is a framework that makes it easy to stream data between Kafka and other systems. Users can deploy a Connect cluster and run various connectors to import data from different sources into Kafka (through Source Connectors) and export data from Kafka further (through Sink Connectors) to storage platforms such as HDFS, S3, or Elasticsearch.

The benefits of using Kafka Connect are:

  • Parallelism and fault tolerance
  • Avoidance of ad hoc code by reusing existing connectors
  • Built-in offset and configuration management

Quick Start for Kafka Connect

The following steps show how to run the existing file connector in standalone mode to copy the content from a source file to a destination file via Kafka:

  1. Prepare some data in a source file:
> echo -e ”hellonworld” > test.txt

  1. Start a file source and a file sink connector:
> bin/connect-standalone.sh
config/connect-file-source.properties
config/connect-file-sink.properties

  1. Verify the data in the destination file:
> more test.sink.txt
hello

  1. Verify the data in Kafka:
> bin/kafka-console-consumer.sh
      --bootstrap-server localhost:9092
      --topic connect-test
      --from-beginning
   {“schema”:{“type”:”string”,
         “optional”:false},
      “payload”:”hello”}
   {“schema”:{“type”:”string”,
         “optional”:false},
      “payload”:”world”}

In the example above, the data in the source file, test.txtis first streamed into a Kafka topic, connect-test, through a file source connector. The records in connect-test are then streamed into the destination file, test.sink.txt. If a new line is added to test.txtit will show up immediately in test.sink.txt. Note that we achieve this by running two connectors without writing any custom code.

Connectors are powerful tools that allow for integration of Apache Kafka into many other systems. There are many open-source and commercially supported options for integrating Apache Kafka — both at the connector layer as well as through an integration services layer — that can provide much more flexibility in message transformation.

Transformations in Connect

Connect is primarily designed to stream data between systems as-is, whereas Kafka Streams is designed to perform complex transformations once the data is in Kafka. That said, Kafka Connect provides a mechanism used to perform simple transformations per record. The following example shows how to enable a couple of transformations in the file source connector:

  1. Add the following lines to connect-file-source.properties:
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka
    .connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache
   .kafka.connect.transforms
   .InsertField$Value
transforms.InsertSource.static.field=
   data_source
transforms.InsertSource.static.value=
   test-file-source

  1. Start a file source connector:
> bin/connect-standalone.sh
config/connect-file-source.properties

  1. Verify the data in Kafka:
> bin/kafka-console-consumer.sh
      --bootstrap-server localhost:9092
   --topic connect-test
{“line”:”hello”,”data_source”:”test
   -file-source”}
{“line”:”world”,”data_source”:”test
   -file-source”}

In step one above, we add two transformations, MakeMap and InsertSourcewhich are implemented by the classes, HoistField$Value and InsertField$Value, respectively. The first one adds a field name, line, to each input string. The second one adds an additional field, data_source, that indicates the name of the source file. After applying the transformation logic, the data in the input file is now transformed to the output in step three. Because the last transformation step is more complex, we implement it with the Streams API (covered in more detail below):

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
StreamsBuilder builder = new StreamsBuilder();
// build a stream from an input topic
KStream<String, String> source = builder.stream(
   “streams-plaintext-input”,
    Consumed.with(stringSerde, stringSerde));
KTable<String, Long> counts = source
   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“ “)))
   .groupBy((key, value) -> value) 
   .count();
// convert the output to another topic
counts.toStream().to(“streams-wordcount-output”,
    Produced.with(stringSerde, longSerde));

Connect REST API

In production, Kafka Connect usually runs in distributed mode and can be managed through REST APIs. The following table lists the common APIs. See the Apache Kafka documentation for more information.

Connect REST API Action

GET /connectors

Return a list of active connectors

POST /connectors 

Create a new connector

GET /connectors/{name} 

Get information for the connector

GET /connectors/{name} /config 

Get configuration parameters for the connector

PUT /connectors/{name} /config 

Update configuration parameters for the connector

GET /connectors/{name} /status 

Get the current status of the connector

Kafka Streams

Kafka Streams is a client library for building real-time applications and microservices where the input and/or output data is stored in Kafka. The benefits of using Kafka Streams are:

  • Less code in the application
  • Built-in state management
  • Lightweight
  • Parallelism and fault tolerance

The most common way of using Kafka Streams is through the Streams DSL, which includes operations such as filtering, joining, grouping, and aggregation. The following code snippet shows the main logic of a Streams example called WordCountDemo:

final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();
StreamsBuilder builder = new StreamsBuilder();
// build a stream from an input topic
   KStream source = builder.stream(
   “streams-plaintext-input”,
    Consumed.with(stringSerde, stringSerde));
KTable counts = source
   .flatMapValues(value -> Arrays.asList(value.
toLowerCase().split(“ “)))
   .groupBy((key, value) -> value) .count();
// convert the output to another topic
counts.toStream().to(“streams-wordcount-output”,
    Produced.with(stringSerde, longSerde));

The code above first creates a stream from an input topic, streams-plaintext-input. It then applies a transformation to split each input line into words. Next, it counts the number of occurrences of each unique word. Finally, the results are written to an output topic, streams-wordcount-output.

The following are steps to run the example code:

  1. Create the input topic:
bin/kafka-topics.sh --create
       --zookeeper localhost:2181
       --replication-factor 1
       --partitions 1
       --topic streams-plaintext-input

  1. Run the stream application:
bin/kafka-run-class.sh org.apache.
kafka.streams.examples.wordcount.
WordCountDemo

  1. Produce some data in the input topic:
bin/kafka-console-producer.sh
        --broker-list localhost:9092
        --topic streams-plaintext-input
    hello world

  1. Verify the data in the output topic:
bin/kafka-console-consumer.sh
        --bootstrap-server localhost:9092
        --topic streams-wordcount-output
        --from-beginning
        --formatter kafka.tools.
            DefaultMessageFormatter
        --property print.key=true
        --property print.value=true
        --property key.deserializer=
            org.apache.kafka.common.
            serialization.StringDeserializer
        --property value.deserializer=
            org.apache.kafka.common.
            serialization.LongDeserializer
   hello 1
   world 1

KStream vs. KTable

There are two key concepts in Kafka Streams: KStream and KTable. A topic can be viewed as either of the two. Their differences are summarized in the table below:

KStream KTable

Concept

Each record is treated as an append to the stream.

Each record is treated as an update to an existing key.

Usage

Model append-only data (eg, click streams).

Model updatable reference data (eg, user profiles).

The following example illustrates their differences:

(Key, Value) Records

Sum of Values ​​as KStream

Sum of Values ​​as KTable

(“k1”, 2) (“k1”, 5)

7

5

When a topic is viewed as a KStreamthere are two independent records and, thus, the sum of the values ​​is 7. On the other hand, if the topic is viewed as a KTablethe second record is treated as an update to the first record since they have the same key: “k1”. Therefore, only the second record is retained in the stream, and the sum is 5 instead.

KStreams DSL

The following section outlines a list of common operations available in Kafka Streams.

KStream operators help abstract each record stream into independent key-value pairs. Such operations include:

  1. filter(Predicate) – Create a new KStream that consists of all records of this stream that satisfy the given predicate.

Example:

ks_out = ks_in.filter( (key, value) -> value > 5 );
ks_in: ks_out: (“k1”, 2) (“k2”, 7) (“k2”, 7)

  1. map(KeyValueMapper) – Transform each record of the input stream into a new record in the output stream (both key and value type can be altered arbitrarily).

Example:

ks_out = ks_in..map( (key, value) -> new KeyValue<>(key, key) )
ks_in: ks_out: (“k1”, 2) (“k1”, “k1”) (“k2”, 7) (“k2”, “k2”)

  1. groupBy() – Group the records by their current key into a KGroupedStream while preserving the original values.

Example:

ks_out = ks.groupBy()
ks_in: ks_out: (“k1”, 1) (“k1”, ((“k1”, 1), (“k2”, 2) (“k1”, 3))) (“k1”, 3) (“k2”, ((“k2”, 2)))

  1. join(KTable, ValueJoiner) – Join records of the input stream with records from the KTable if the keys from the records match. Return a stream of the key and the combined value using ValueJoiner.

Example:

ks_out = ks_in.join( kt, (value1, value2) -> value1 + value2 );
ks_in: kt: (“k1”, 1) (“k1”, 11) (“k2”, 2) (“k2”, 12) (“k3”, 3) (“k4”, 13)
ks_out: (“k1”, 12) (“k2”, 14)

  1. join(KStream, ValueJoiner, JoinWindows) – Join records of the two streams if the keys match and the timestamp from the records satisfy the time constraint specified by JoinWindows. Return a stream of the key and the combined value using ValueJoiner.

Example:

ks_out = ks1.join( ks2, (value1, value2) -> value1 + value2, JoinWindows. of(100) );
ks1: ks2: (“k1”, 1, 100t) (“k1”, 11, 150t) (“k2”, 2, 200t) (“k2”, 12, 350t) (“k3”, 3, 300t) (“k4”, 13, 380t) * t indicates a timestamp.
ks_out: (“k1”, 12)

KGroupedStream operators, unlike KStream operators, help abstract grouped streams of key-value pairs. Such operations include:

  1. count() – Count the number of records in this stream by the grouped key and return it as a KTable.

Example:

kt = kgs.count(); kgs: (“k1”, ((“k1”, 1), (“k1”, 3))) (“k2”, ((“k2”, 2)))
kt: (“k1”, 2) (“k2”, 1)

  1. reduce(Reducer) – Combine the values ​​of records in this stream by the grouped key and return it as a KTable.

Example:

kt = kgs.reduce( (aggValue, newValue) -> aggValue + newValue );
kgs: (“k1”, (“k1”, 1), (“k1”, 3))) (“k2”, ((“k2”, 2)))
kt: (“k1”, 4) (“k2”, 2)

  1. windowedBy(Windows) – Further group the records by the timestamp and return it as a TimeWindowedKStream.

Example:

twks = kgs.windowedBy( TimeWindows.
of(100) );
kgs: (“k1”, ((“k1”, 1, 100t), (“k1”, 3, 150t))) (“k2”, ((“k2”, 2, 100t), (“k2”, 4, 250t))) * t indicates a timestamp.
twks: (“k1”, 100t -- 200t, ((“k1”, 1, 100t), (“k1”, 3, 150t))) (“k2”, 100t -- 200t, ((“k2”, 2, 100t))) (“k2”, 200t -- 300t, ((“k2”, 4, 250t)))

Additional details on the respective set of operations on KTable and KGroupedTable can be found in the Kafka Documentation.

Querying States in KStreams

While processing data in real time, a KStreams application locally maintains states such as the word counts in the previous example. Those states can be queried interactively through an API described in the Interactive Queries section of the Kafka documentation. This avoids the need of an external data store for exporting and serving those states.

Exactly-Once Processing in KStreams

Failures in the brokers or the clients may introduce duplicates during the processing of records. KStreams provides the capability of processing records exactly once, even under failures. This can be achieved by simply setting the property, processing.guaranteeto exactly_once in KStreams.

Documenting and Visualizing Data Streams

Processing stream data involves correlating data between streams, analyzing patterns, and understanding application/user behavior to make predictions. In a typical event stream, events are continuously flowing through Apache Kafka clusters, with a rich ecosystem of clients and data sinks integrating with the library. Making real-time decisions from data streams, therefore, requires the enforcement of comprehensive observability, event ownership, and governance.

Governing data in motion is often a shared responsibility between developers, DataOps, and business development teams. This often starts with business managers defining the business and technical requirements for event management, while data engineers are managing client permissions for creating internal topics. Subsequently, developers/system admins configure security settings within the clients and then administer each client’s corresponding settings within Kafka Streams.

Although Kafka lacks an innate event portal to catalog event streams and visualize the topology of the data pipelines, the Kafka streaming client library supports a data lineage visibility to identify the origin and path taken by an event. It is possible to extend Kafka by integrating it with an event management platform that can enforce the discovery, visualization, and cataloging of event streams. Such platforms identify newer data schemas, connectors, client groups, and events as they occur, which streamlines the recording of event changes. These tools help uncover deeper analytical insights and help solve complex data relationships while also fostering efficient collaboration among distributed teams.

.

Leave a Comment