Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9598

RocksDB exception when grouping dynamically appearing topics into a KTable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.2.0, 2.4.0
    • None
    • streams
    • None

    Description

      A streams application consumes a number of topics via a whitelisted regex. The topics appear dynamically, generated from dynamically appearing MongoDB collections by debezium MongoDB source driver.

      The development is running on debezium docker images (Debezium 0.9 and Debezium 1.0 -> Kafka 2.2.0 and 2.4.0), single instance of Kafka, Connect and the streams consumer app.

      As the MongoDB driver provides only deltas of the changes, to collect full record for each key, the code creates KTable which is then transformed into a KStream for further joining with other KTables and Global KTables.

      The following piece of code results in the exception when a new topic is added:

       

      Pattern tResultPattern =
       Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
      KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern, Consumed.with(stringSerde, jsonSerde));
       KTable<String, JsonNode> tResultsTempTable = tResultsTempStream.groupByKey(Grouped.with(stringSerde,jsonSerde))
       .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue)); // mergeNodes is a Json traverse/merger procedure
      KStream<String, JsonNode> tResults =
       tResultsTempTable.toStream();
       
      

      kconsumer_1 | Exception in thread "split-reader-client3-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-REDUCE-STATE-STORE-0000000032 at location /tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032

      ...

      kconsumer_1 | Caused by: org.rocksdb.RocksDBException: lock : /tmp/split-reader3/10_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000032/LOCK: No locks available

      Kstore 10_0 contains tr[0-9a-fA-F] 32 records, I checked.

      more details about exception are in the attached file.

      The exception is no longer present when I use an intermediate topic instead:

       

      Pattern tResultPattern =
       Pattern.compile(config.getProperty("mongodb_source_prefix")+".tr[0-9a-fA-F]{32}");
      KStream<String, JsonNode> tResultsTempStream = builder.stream(tResultPattern, Consumed.with(stringSerde, jsonSerde));
       tResultsTempStream.transform(trTransformer::new).to(config.getProperty("tr_intermediate_topic_name"),Produced.with(stringSerde, jsonSerde)); // trTransformer adds topic name into value Json, in previous snippet it was done in the pipeline after grouping/streaming
      KStream<String, JsonNode> tResultsTempStream2 = builder.stream(config.getProperty("tr_intermediate_topic_name"), Consumed.with(stringSerde, jsonSerde));
       KTable<String, JsonNode> tResultsTempTable = tResultsTempStream2.groupByKey(Grouped.with(stringSerde,jsonSerde))
       .reduce((aggValue, newValue) -> mergeNodes(aggValue,newValue));
      KStream<String, JsonNode> tResults =
       tResultsTempTable.toStream();
      

       

       

      If making KTable from multiple whitelisted topics is something that is outside of scope of Kafka Streams, perhaps it would make sense to mention it in the docs.

      Attachments

        1. exception-details.txt
          11 kB
          Sergey Menshikov

        Activity

          People

            Unassigned Unassigned
            sergem Sergey Menshikov
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: