Details
-
Bug
-
Status: Patch Available
-
Major
-
Resolution: Unresolved
-
2.4.1
-
None
-
None
Description
When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
GlobalKTable<GenericRecord, GenericRecord> globalTable = streamsBuilder.globalTable(topic, Consumed.with(keySerde, valueSerde), Materialized.as(Stores.inMemoryKeyValueStore(topic)));
I got StreamsException like below:
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for topic applicationId-sourceTopicName-changelog Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
But as seen in GlobalKTable Java Doc the changelog stream shall not be created and in fact was not created.This leads to our custom serde to be searching for schema (we are using Confluent Platform and Avro based schema registry for the job) using a wrong topic name (should just be sourceTopicName rather than applicationId-sourceTopicName-changelog).
After digging into the code, I found initStoreSerde method in MeteredTimestampedKeyValueStore would assume the topic backing the store would always be storeChangelogTopic when initializing the Serdes for the state store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl ProcessorContext) we shall use the original topic name directly here.
Attachments
Attachments
Issue Links
- relates to
-
KAFKA-10179 State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
- Closed