Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10049

KTable-KTable Foreign Key join throwing Serialization Exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.5.0, 2.6.0
    • 2.6.0, 2.5.1
    • streams
    • None

    Description

       I want to make use of KTable-KTable Foreign Key join feature released in 2.5.0 but facing issue while running the code. 

       
      
       public static void main(String[] args) {
      
           Properties props = new Properties();
           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2");
           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
           props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      
           StreamsBuilder builder = new StreamsBuilder();
           KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora);
           KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data);
      
           KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
      
                  @Override
                  public EnrichedOrder apply(OrderObject order, StockMarketData stock) {
                      EnrichedOrder enOrder = EnrichedOrder.builder()
                              .orderId(order.getOrderId())
                              .execPrice(order.getPrice())
                              .symbol(order.getSymbol())
                              .quanity(order.getQuanity())
                              .side(order.getSide())
                              .filledQty(order.getFilledQty())
                              .leaveQty(order.getLeaveQty())
                              .index(order.getIndex())
                              .vWaprelative(order.getVWaprelative())
                              .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
                              .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
                              .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
                              .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
                              .build();
                      return enOrder;
                  }
              } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
      
           enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
               @Override
              public void apply(String arg0, EnrichedOrder arg1) {
      
                   logger.info(String.format("key = %s, value = %s", arg0, arg1));
              }
          });
      
           KafkaStreams streams = new KafkaStreams(builder.build(), props);
           streams.start();
      
           Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
      }}}
      
      
      
       
      
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>2.5.0</version>
          </dependency>
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-streams</artifactId>
              <version>2.5.0</version>
          </dependency>
      
      

      Exception:

      18:49:31.525 [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - stream-thread [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] task [0_0] Failed to flush state store orders-STATE-STORE-0000000000: 
          org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to a sink topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
              at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) [kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) [kafka-streams-2.5.0.jar:?]
          Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.messages.JSONSerdeCompatible
              at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1) ~[classes/:?]
              at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) ~[kafka-streams-2.5.0.jar:?]
              at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) ~[kafka-streams-2.5.0.jar:?]
              ... 34 more
      

      Attachments

        1. 10049-bellemare.patch
          25 kB
          Adam Bellemare

        Activity

          People

            abellemare Adam Bellemare
            amicngh Amit Chauhan
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: