Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4640

Serialization of the initialValue of a Fold on WindowedStream fails

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.1.2
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Streaming
    • Labels:
      None

      Description

      The following program

      DataStream<Tuple2<String, Long>> src = env.fromElements(new Tuple2<String, Long>("a", 1L));
      
      src
        .keyBy(1)
        .timeWindow(Time.minutes(5))
        .fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
          @Override
          public TreeMultimap<Long, String> fold(
              TreeMultimap<Long, String> topKSoFar, 
              Tuple2<String, Long> itemCount) throws Exception 
          {
            String item = itemCount.f0;
            Long count = itemCount.f1;
            topKSoFar.put(count, item);
            if (topKSoFar.keySet().size() > 10) {
              topKSoFar.removeAll(topKSoFar.keySet().first());
            }
            return topKSoFar;
          }
      });
      

      throws this exception

      Caused by: java.lang.RuntimeException: Could not add value to folding state.
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      at com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
      at com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
      at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
      at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
      at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
      ... 6 more

      The exception is caused because the initial value was not correctly deserialized and is null.

      The user reporting this issue said that using the same FoldFunction on a KeyedStream (without a window) works fine.

      I tracked the problem down to the serialization of the StateDescriptor, i.e., the writeObject() and readObject() methods. The methods use Flink's TypeSerializers to serialize the default value. In case of the TreeMultiMap this is the KryoSerializer which fails to read the serialized data for some reason.

      A quick workaround to solve this issue would be to check if the default value implements Serializable and use Java Serialization in this case. However, it would be good to track the root cause of this problem.

        Activity

        Hide
        StephanEwen Stephan Ewen added a comment -

        I actually reproduced the error BOTH on the KeyedStream and on the WindowDataStream.

        The problem is that Kryo cannot properly serialize the TreeMultiMap. It uses Objenesis to instantiate the map on deserialization, which leaves a broken object that then causes the nullpointer exception.
        That is a Kryo/Guava incompatibility. Not sure there is anything we can do directly about that.

        Should be fixable by registering a suitable serializer for the TreeMultiMap:

        env.registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class);
        
        Show
        StephanEwen Stephan Ewen added a comment - I actually reproduced the error BOTH on the KeyedStream and on the WindowDataStream . The problem is that Kryo cannot properly serialize the TreeMultiMap . It uses Objenesis to instantiate the map on deserialization, which leaves a broken object that then causes the nullpointer exception. That is a Kryo/Guava incompatibility. Not sure there is anything we can do directly about that. Should be fixable by registering a suitable serializer for the TreeMultiMap: env.registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class);
        Hide
        StephanEwen Stephan Ewen added a comment -

        Okay, so the keyBy(1).fold(...) case can be fixed via env.registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class);

        The Window operator still fails if I do keyBy(1).timeWindow(Time.seconds(10)).fold(...).
        The bug is that serializer registrations are not properly forwarded.

        Fixing that...

        Show
        StephanEwen Stephan Ewen added a comment - Okay, so the keyBy(1).fold(...) case can be fixed via env.registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class); The Window operator still fails if I do keyBy(1).timeWindow(Time.seconds(10)).fold(...) . The bug is that serializer registrations are not properly forwarded. Fixing that...
        Hide
        StephanEwen Stephan Ewen added a comment -

        Have a fix and nice tests. Waiting for the CI to give green light, then merging this fix.

        Show
        StephanEwen Stephan Ewen added a comment - Have a fix and nice tests. Waiting for the CI to give green light, then merging this fix.
        Hide
        StephanEwen Stephan Ewen added a comment -

        Fixed in

        • 1.1.3 via 52a4440d916fb450c4999f6e1f42f392e247b426
        • 1.2.0 via 4d4eb64be7490672771243147824a70d3d47c501
        Show
        StephanEwen Stephan Ewen added a comment - Fixed in 1.1.3 via 52a4440d916fb450c4999f6e1f42f392e247b426 1.2.0 via 4d4eb64be7490672771243147824a70d3d47c501

          People

          • Assignee:
            StephanEwen Stephan Ewen
            Reporter:
            fhueske Fabian Hueske
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development