Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17800

RocksDB optimizeForPointLookup results in missing time windows

    XMLWordPrintableJSON

Details

    • Hide
      After FLINK-17800 by default we will set `setTotalOrderSeek` to true for RocksDB's `ReadOptions`, to prevent user from miss using `optimizeForPointLookup`. Meantime we support customizing `ReadOptions` through `RocksDBOptionsFactory`. Please set `setTotalOrderSeek` back to false if any performance regression observed (normally won't happen according to our testing).
      Show
      After FLINK-17800 by default we will set `setTotalOrderSeek` to true for RocksDB's `ReadOptions`, to prevent user from miss using `optimizeForPointLookup`. Meantime we support customizing `ReadOptions` through `RocksDBOptionsFactory`. Please set `setTotalOrderSeek` back to false if any performance regression observed (normally won't happen according to our testing).

    Description

      My Setup:

      We have been using the RocksDb option of optimizeForPointLookup and running version 1.7 for years. Upon upgrading to Flink 1.10 we started receiving a strange behavior of missing time windows on a streaming Flink job. For the purpose of testing I experimented with previous Flink version and (1.8, 1.9, 1.9.3) and non of them showed the problem

       

      A sample of the code demonstrating the problem is here:

       val datastream = env
       .addSource(KafkaSource.keyedElements(config.kafkaElements, List(config.kafkaBootstrapServer)))
      
       val result = datastream
       .keyBy( _ => 1)
       .timeWindow(Time.milliseconds(1))
       .print()
      

       

       

      The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka topics), the elements in each of the streams are separately increasing. The elements generate increasing timestamps using an event time and start from 1, increasing by 1. The first partitions would consist of timestamps 1, 2, 10, 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9...

       

      What I observe:

      The time windows would open as I expect for the first 127 timestamps. Then there would be a huge gap with no opened windows, if the source has many elements, then next open window would be having a timestamp in the thousands. A gap of hundred of elements would be created with what appear to be 'lost' elements. Those elements are not reported as late (if tested with the .sideOutputLateData operator). The way we have been using the option is by setting in inside the config like so:

      etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456

      We have been using it for performance reasons as we have huge RocksDB state backend.

      Attachments

        1. MyMissingWindows.scala
          3 kB
          Yordan Pavlov
        2. MyMissingWindows.scala
          2 kB
          Yun Tang
        3. MissingWindows.scala
          2 kB
          Yordan Pavlov

        Issue Links

          Activity

            People

              yunta Yun Tang
              YordanPavlov Yordan Pavlov
              Votes:
              1 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: