Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4640

Serialization of the initialValue of a Fold on WindowedStream fails

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.1.2
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Streaming
    • Labels:
      None

      Description

      The following program

      DataStream<Tuple2<String, Long>> src = env.fromElements(new Tuple2<String, Long>("a", 1L));
      
      src
        .keyBy(1)
        .timeWindow(Time.minutes(5))
        .fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
          @Override
          public TreeMultimap<Long, String> fold(
              TreeMultimap<Long, String> topKSoFar, 
              Tuple2<String, Long> itemCount) throws Exception 
          {
            String item = itemCount.f0;
            Long count = itemCount.f1;
            topKSoFar.put(count, item);
            if (topKSoFar.keySet().size() > 10) {
              topKSoFar.removeAll(topKSoFar.keySet().first());
            }
            return topKSoFar;
          }
      });
      

      throws this exception

      Caused by: java.lang.RuntimeException: Could not add value to folding state.
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      at com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
      at com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
      at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
      ... 6 more

      The exception is caused because the initial value was not correctly deserialized and is null.

      The user reporting this issue said that using the same FoldFunction on a KeyedStream (without a window) works fine.

      I tracked the problem down to the serialization of the StateDescriptor, i.e., the writeObject() and readObject() methods. The methods use Flink's TypeSerializers to serialize the default value. In case of the TreeMultiMap this is the KryoSerializer which fails to read the serialized data for some reason.

      A quick workaround to solve this issue would be to check if the default value implements Serializable and use Java Serialization in this case. However, it would be good to track the root cause of this problem.

        Attachments

          Activity

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: