Description
Example topology:
builder .table("input", Consumed.`with`(Serdes.String(), Serdes.String())) .groupBy((key, value) => new KeyValue(value, key)) .count() .toStream() .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
At runtime, we get the following exception
Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
The error does not give information about the line or the processor causing the issue.
Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology.
Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes.
Attachments
Issue Links
- links to