Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Not A Bug
-
2.3.0
-
None
-
None
Description
I have a cluster with 3 brokers running version 0.11
My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated to 2.3.0
I have no executed any migration as my data is disposable, therefore I have deleted all intermediate topics, except input and output topics.
My streams config is:
application.id = consumer-id-v1.00 application.server = bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 524288000 client.id = commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class com.acme.stream.TimeExtractor default.value.serde = class com.acme.serde.MyDtoSerde max.task.idle.ms = 0 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 25 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 40000 retries = 0 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams topology.optimization = none upgrade.from = null windowstore.changelog.additional.retention.ms = 86400000
in my stream I am using withLoggingDisabled
stream.filter((key, val) -> val!=null) .selectKey((key, val) -> getId(val)) .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde())) .windowedBy(TimeWindows.of(aggregationWindowSizeDuration) .grace(windowRetentionPeriodDuration)) .aggregate(MyDto::new, new MyUpdater(), Materialized.as("aggregation-updater") .withLoggingDisabled() .with(Serdes.String(), new MyDtoSerde())) .toStream((k, v) -> k.key()) .mapValues(val -> { ...
but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if I delete them before running again the app or if I change the application.id
With a new application.id, topics are recreated with the new prefix.
Attachments
Issue Links
- relates to
-
KAFKA-8666 Improve Documentation on usage of Materialized config object
- Open