Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4640

Serialization of the initialValue of a Fold on WindowedStream fails

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.1.2, 1.2.0
    • 1.1.3, 1.2.0
    • API / DataStream
    • None

    Description

      The following program

      DataStream<Tuple2<String, Long>> src = env.fromElements(new Tuple2<String, Long>("a", 1L));
      
      src
        .keyBy(1)
        .timeWindow(Time.minutes(5))
        .fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
          @Override
          public TreeMultimap<Long, String> fold(
              TreeMultimap<Long, String> topKSoFar, 
              Tuple2<String, Long> itemCount) throws Exception 
          {
            String item = itemCount.f0;
            Long count = itemCount.f1;
            topKSoFar.put(count, item);
            if (topKSoFar.keySet().size() > 10) {
              topKSoFar.removeAll(topKSoFar.keySet().first());
            }
            return topKSoFar;
          }
      });
      

      throws this exception

      Caused by: java.lang.RuntimeException: Could not add value to folding state.
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      at com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
      at com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
      at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
      ... 6 more

      The exception is caused because the initial value was not correctly deserialized and is null.

      The user reporting this issue said that using the same FoldFunction on a KeyedStream (without a window) works fine.

      I tracked the problem down to the serialization of the StateDescriptor, i.e., the writeObject() and readObject() methods. The methods use Flink's TypeSerializers to serialize the default value. In case of the TreeMultiMap this is the KryoSerializer which fails to read the serialized data for some reason.

      A quick workaround to solve this issue would be to check if the default value implements Serializable and use Java Serialization in this case. However, it would be good to track the root cause of this problem.

      Attachments

        Activity

          People

            sewen Stephan Ewen
            fhueske Fabian Hueske
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: