Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.1.2, 1.2.0
-
None
Description
The following program
DataStream<Tuple2<String, Long>> src = env.fromElements(new Tuple2<String, Long>("a", 1L)); src .keyBy(1) .timeWindow(Time.minutes(5)) .fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { @Override public TreeMultimap<Long, String> fold( TreeMultimap<Long, String> topKSoFar, Tuple2<String, Long> itemCount) throws Exception { String item = itemCount.f0; Long count = itemCount.f1; topKSoFar.put(count, item); if (topKSoFar.keySet().size() > 10) { topKSoFar.removeAll(topKSoFar.keySet().first()); } return topKSoFar; } });
throws this exception
Caused by: java.lang.RuntimeException: Could not add value to folding state.
at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
at com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
at org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
at org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
... 6 more
The exception is caused because the initial value was not correctly deserialized and is null.
The user reporting this issue said that using the same FoldFunction on a KeyedStream (without a window) works fine.
I tracked the problem down to the serialization of the StateDescriptor, i.e., the writeObject() and readObject() methods. The methods use Flink's TypeSerializers to serialize the default value. In case of the TreeMultiMap this is the KryoSerializer which fails to read the serialized data for some reason.
A quick workaround to solve this issue would be to check if the default value implements Serializable and use Java Serialization in this case. However, it would be good to track the root cause of this problem.