Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4612

Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B"

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 0.10.1.1
    • None
    • streams
    • None
    • Virtual Machine using Debian 8 + Confluent Platform 3.1.1.

    Description

      I've attached a minimal single source file project that reliably reproduces this issue.

      This project does the following:

      1) Create test input data. Produces a single random (String,String) record into two diferent topics "topicInput" and "topicTable"

      2) Creates and runs a Kafka Streams application:

      val kafkaTable: KTable[String, String] = builder.table(Serdes.String, Serdes.String, "topicTable", "topicTable")
      val incomingRecords: KStream[String, String] = builder.stream(Serdes.String, Serdes.String, "topicInput")
      val reKeyedRecords: KStream[String, String] = incomingRecords.selectKey((k, _) => k)
      val joinedRecords: KStream[String, String] = reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1)
      joinedRecords.to(Serdes.String, Serdes.String, "topicOutput")

      This reliably generates the following error:

      [error] (StreamThread-1) java.lang.ClassCastException: java.lang.String cannot be cast to [B
      java.lang.ClassCastException: java.lang.String cannot be cast to [B
      at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
      at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63)
      at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
      at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
      at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

      One caveat: I'm running this on a Confluent Platform 3.1.1 instance which uses Kafka 0.10.1.0 since there is no newer Confluent Platform available. The Kafka Streams project is built using "kafka-clients" and "kafka-streams" version 0.10.1.1. If I use 0.10.1.0, I reliably hit bug https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is any issue using 0.10.1.1 libraries with a Confluent Platform running Kafka 0.10.1.0. I will obviously try the next Confluent Platform binary when it is available.

      Attachments

        1. KafkaIsolatedBug.tar.gz
          3 kB
          Kurt Ostfeld

        Activity

          People

            Unassigned Unassigned
            kurtsamba Kurt Ostfeld
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: