Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16573

Streams does not specify where a Serde is needed

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Patch Available
    • Minor
    • Resolution: Unresolved
    • 3.7.0
    • None
    • streams
    • None

    Description

      Example topology:

       builder
         .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
         .groupBy((key, value) => new KeyValue(value, key))
         .count()
         .toStream()
         .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
       

      At runtime, we get the following exception 

      Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
      org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
          at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
          at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
          at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
          at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
          at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
          at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
          at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
          at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
          at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
          at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)

      The error does not give information about the line or the processor causing the issue.

      Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology. 

      Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes.

       

        

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            ayoubomari Ayoub Omari
            ayoubomari Ayoub Omari

            Dates

              Created:
              Updated:

              Slack

                Issue deployment