Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4640

Serialization of the initialValue of a Fold on WindowedStream fails


    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.1.2
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Streaming
    • Labels:


      The following program

      DataStream<Tuple2<String, Long>> src = env.fromElements(new Tuple2<String, Long>("a", 1L));
        .fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
          public TreeMultimap<Long, String> fold(
              TreeMultimap<Long, String> topKSoFar, 
              Tuple2<String, Long> itemCount) throws Exception 
            String item = itemCount.f0;
            Long count = itemCount.f1;
            topKSoFar.put(count, item);
            if (topKSoFar.keySet().size() > 10) {
            return topKSoFar;

      throws this exception

      Caused by: java.lang.RuntimeException: Could not add value to folding state.
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      at com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
      at com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
      at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
      ... 6 more

      The exception is caused because the initial value was not correctly deserialized and is null.

      The user reporting this issue said that using the same FoldFunction on a KeyedStream (without a window) works fine.

      I tracked the problem down to the serialization of the StateDescriptor, i.e., the writeObject() and readObject() methods. The methods use Flink's TypeSerializers to serialize the default value. In case of the TreeMultiMap this is the KryoSerializer which fails to read the serialized data for some reason.

      A quick workaround to solve this issue would be to check if the default value implements Serializable and use Java Serialization in this case. However, it would be good to track the root cause of this problem.




            • Assignee:
              StephanEwen Stephan Ewen
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              4 Start watching this issue


              • Created: