Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8646

Materialized.withLoggingDisabled() does not disable changelog topics creation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Not A Bug
    • 2.3.0
    • None
    • streams
    • None

    Description

      I have a cluster with 3 brokers running version 0.11

      My kafka-streams app was using kafka-client 0.11.0.1 but recently I've migrated to 2.3.0

      I have no executed any migration as my data is disposable, therefore I have deleted all intermediate topics, except input and output topics.

      My streams config is: 

      application.id = consumer-id-v1.00
      application.server =
      bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
      buffered.records.per.partition = 1000
      cache.max.bytes.buffering = 524288000
      client.id =
      commit.interval.ms = 30000
      connections.max.idle.ms = 540000
      default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
      default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
      default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
      default.timestamp.extractor = class com.acme.stream.TimeExtractor
      default.value.serde = class com.acme.serde.MyDtoSerde
      max.task.idle.ms = 0
      metadata.max.age.ms = 300000
      metric.reporters = []
      metrics.num.samples = 2
      metrics.recording.level = INFO
      metrics.sample.window.ms = 30000
      num.standby.replicas = 0
      num.stream.threads = 25
      partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
      poll.ms = 100
      processing.guarantee = at_least_once
      receive.buffer.bytes = 32768
      reconnect.backoff.max.ms = 1000
      reconnect.backoff.ms = 50
      replication.factor = 1
      request.timeout.ms = 40000
      retries = 0
      retry.backoff.ms = 100
      rocksdb.config.setter = null
      security.protocol = PLAINTEXT
      send.buffer.bytes = 131072
      state.cleanup.delay.ms = 600000
      state.dir = /tmp/kafka-streams
      topology.optimization = none
      upgrade.from = null
      windowstore.changelog.additional.retention.ms = 86400000
      

      in my stream I am using withLoggingDisabled 

      stream.filter((key, val) -> val!=null)
          .selectKey((key, val) -> getId(val))
          .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
          .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
                                 .grace(windowRetentionPeriodDuration))
          .aggregate(MyDto::new,
                     new MyUpdater(),
                     Materialized.as("aggregation-updater")
                                 .withLoggingDisabled()
                                 .with(Serdes.String(), new MyDtoSerde()))
          .toStream((k, v) -> k.key())
          .mapValues(val -> { ...
      

      but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter if I delete them before running again the app or if I change the application.id

      With a new application.id, topics are recreated with the new prefix.

       

      Attachments

        Issue Links

          Activity

            People

              bbejeck Bill Bejeck
              jmhostalet jmhostalet
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: