Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7882

StateStores are frequently closed during the 'transform' method

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.0.0
    • None
    • streams
    • None

    Description

      Hello, I have a problem with the state store being closed frequently while transforming upcoming records. To ensure only one record of the same key and the window comes to an aggregate I have created a custom Transformer (I know something similar is going to be introduced with suppress method on KTable in the future, but my implementation is quite simple and imo should work correctly) with the following implementation:

      override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
      
      val partition = context.partition() 
      if (partition != -1) store.put(key.key(), (value, partition), key.window().start()) 
      else logger.warn(s"-1 partition")
      
      null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the punctuator callback
      }
      

       

      What I do get is the following error:

      Store MyStore is currently closed

      This problem appears only when the number of streaming threads (or input topic partitions) is greater than 1 even if I'm just saving to the store and turn off the punctuation.

      If punctuation is present, however, I sometimes get -1 as a partition value in the transform method. I'm familiar with the basic docs, however, I haven't found anything that could help me here.

      I build my state store like this:

      val stateStore = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(
              stateStoreName,
              timeWindows.maintainMs() + timeWindows.sizeMs + TimeUnit.DAYS.toMillis(1),
              timeWindows.segments,
              timeWindows.sizeMs,
              false
            ),
            serde[K],
            serde[(V, Int)]
          )
      

      and include it in a DSL API like this:

      builder.addStateStore(stateStore)
      (...).transform(new MyTransformer(...), "MyStore")
      

      INB4: I don't close any state stores manually, I gave them retention time as long as possible for the debugging stage, I tried to hotfix that with the retry in the transform method and the state stores reopen at the end and the `put` method works, but this approach is questionable and I am concerned if it actually works.

      Edit:
      May this be because of the fact that the

      StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG

      is set to low value? If I understand correctly, spilling to disk is done therefore more frequently, may it, therefore, cause closing the store?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nijo Mateusz Owczarek
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: