Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
1.7.2, 1.9.2, 1.10.0
-
None
-
None
-
Can reproduce on the following configurations:
OS: macOS 10.14.3
Java: 1.8.0_202
OS: CentOS 7.2.1511
Java: 1.8.0_102
Description
Hi,
I encountered the following exception:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at flink.bug.App.main(App.java:21) Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876) at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36) at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748)
Code that reproduces the problem:
package flink.bug; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; public class App { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.fromElements(1, 2) .map(Aggregate::new) .keyBy(Aggregate::getKey) .timeWindow(Time.seconds(2)) .reduce(Aggregate::reduce) .addSink(new CollectSink()); env.execute(); } private static class Aggregate { private Key key = new Key(); public Aggregate(long number) { } public static Aggregate reduce(Aggregate a, Aggregate b) { return new Aggregate(0); } public Key getKey() { return key; } } public static class Key { } private static class CollectSink implements SinkFunction<Aggregate> { private static final long serialVersionUID = 1; @SuppressWarnings("rawtypes") @Override public void invoke(Aggregate value, Context ctx) throws Exception { } } }
Attached is the project that can be executed with ./gradlew run showing the problem, or you can run the attached flink-bug-dist.zip which is prepackaged with the dependencies.
Thanks in advance
Attachments
Attachments
Issue Links
- causes
-
FLINK-16596 Support Enum-Values as part of a Key
- Open