Unable to encode element ‘org.apache.beam.sdk.io.kafka.KafkaRecord@6f30ad3’ with coder ‘KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)’

I am trying to run a pipeline using apache-beam with source as one kafka topic and destination as another kafka topic. I have written my code and is working well(ie, no error in code I think). But whenever I try to push any msg from my source topic i get this error

> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for instruction 13: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaRecord@6f30ad3' with coder 'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)'.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1683)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2194)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2363)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
    at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2081)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:315)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:218)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:326)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaRecord@6f30ad3' with coder 'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)'.
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:377)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:63)
    at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:70)
    at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:377)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2194)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2363)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
    at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2081)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:315)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:218)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:326)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):
  File "kafkabeamflink.py", line 21, in <module>
    run_pipeline()
  File "kafkabeamflink.py", line 8, in run_pipeline
    (p
  File "/home/yashdeep/.local/lib/python3.8/site-packages/apache_beam/pipeline.py", line 586, in __exit__
    self.result.wait_until_finish()
  File "/home/yashdeep/.local/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 599, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-yashdeep-0602142400-ea8a5123_065c8856-2653-4097-961d-825d0e097207 failed in state FAILED: java.lang.RuntimeException: Error received from SDK harness for instruction 13: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaRecord@6f30ad3' with coder 'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)'.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1683)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2194)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2363)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
    at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2081)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:315)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:218)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:326)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'org.apache.beam.sdk.io.kafka.KafkaRecord@6f30ad3' with coder 'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)'.
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:377)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:63)
    at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:70)
    at org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
    at org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
    at org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:377)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2194)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2363)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
    at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
    at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:750)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2081)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:378)
    at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1048)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:637)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:632)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:262)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:214)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
    at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:315)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:218)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:326)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I don’t know where to change what. I have done all the pre-requistes mentioned in the flinkRunner page by beam. This is the code :

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka

def run_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as p:
       (p
        | 'Read from Kafka' >> ReadFromKafka(consumer_config={'bootstrap.servers':'localhost:9092',
                                                  'auto.offset.reset': 'latest'}, topics=['demo'])
        | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(10))
        | 'Group by key' >> beam.GroupByKey()
        | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers':'localhost:9092'},
                                                              topic="demo-output"))

if __name__ == '__main__':
    run_pipeline()

https://maximilianmichels.com/2020/getting-started-with-beam-python/

This is the actual blog post that I am trying to follow.

Leave a Comment