Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26585

State Processor API: Loading a state set buffers the whole state set in memory before starting to process

    XMLWordPrintableJSON

Details

    Description

      • When loading a state, MultiStateKeyIterator load and bufferes the whole state in memory before it event processes a single data point 
        • This is absolutely no problem for small state (hence the unit tests work fine)
        • MultiStateKeyIterator ctor sets up a java Stream that iterates all state descriptors and flattens all datapoints contained within
        • The java.util.stream.Stream#flatMap function causes the buffering of the whole data set when enumerated later on
        • See call stack [1] 
          • I our case this is 150e6 data points (> 1GiB just for the pointers to the data, let alone the data itself ~30GiB)
        • I’m not aware of some instrumentation of Stream in order to avoid the problem, hence
        • I coded an alternative implementation of MultiStateKeyIterator that avoids using java Stream,
        • I can contribute our implementation (MultiStateKeyIteratorNoStreams)

      [1]

      Streams call stack:

      hasNext:77, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator)

      next:82, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator)

      forEachRemaining:116, Iterator (java.util)

      forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)

      forEach:580, ReferencePipeline$Head (java.util.stream)

      accept:270, ReferencePipeline$7$1 (java.util.stream)                                       # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<? extends R>> var1)

      accept:373, ReferencePipeline$11$1 (java.util.stream)                                      # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)

      accept:193, ReferencePipeline$3$1 (java.util.stream)                                       # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)

      tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)

      lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator (java.util.stream)

      getAsBoolean:-1, 1528195520 (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)

      fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)

      doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)

      tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)

      hasNext:681, Spliterators$1Adapter (java.util)

      hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)

      hasNext:162, KeyedStateReaderOperator$NamespaceDecorator (org.apache.flink.state.api.input.operator)

      reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)

      invoke:191, DataSourceTask (org.apache.flink.runtime.operators)

      doRun:776, Task (org.apache.flink.runtime.taskmanager)

      run:563, Task (org.apache.flink.runtime.taskmanager)

      run:748, Thread (java.lang)

      Attachments

        1. MultiStateKeyIteratorNoStreams.java
          3 kB
          Matthias Schwalbe

        Issue Links

          Activity

            People

              Matthias Schwalbe Matthias Schwalbe
              Matthias Schwalbe Matthias Schwalbe
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: