Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11774

Unstable hashCode causes IllegalArgumentException in HeapPriorityQueueSet

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 1.7.2, 1.9.2, 1.10.0
    • None
    • None
    • Can reproduce on the following configurations:

       

      OS: macOS 10.14.3

      Java: 1.8.0_202

       

      OS: CentOS 7.2.1511

      Java: 1.8.0_102

    Description

      Hi,

      I encountered the following exception:

      Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
              at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
              at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
              at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
              at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
              at flink.bug.App.main(App.java:21)
      Caused by: java.lang.IllegalArgumentException
              at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
              at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
              at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
              at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
              at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
              at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
              at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
              at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
              at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
              at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
              at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
              at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
              at java.lang.Thread.run(Thread.java:748)
      

      Code that reproduces the problem:

      package flink.bug;
      
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.SinkFunction;
      import org.apache.flink.streaming.api.windowing.time.Time;
      
      public class App {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              env.setParallelism(2);
      
              env.fromElements(1, 2)
                  .map(Aggregate::new)
                  .keyBy(Aggregate::getKey)
                  .timeWindow(Time.seconds(2))
                  .reduce(Aggregate::reduce)
                  .addSink(new CollectSink());
      
              env.execute();
          }
      
          private static class Aggregate {
      
              private Key key = new Key();
      
              public Aggregate(long number) {
              }
      
              public static Aggregate reduce(Aggregate a, Aggregate b) {
                  return new Aggregate(0);
              }
      
              public Key getKey() {
                  return key;
              }
      
          }
      
          public static class Key {
          }
      
          private static class CollectSink implements SinkFunction<Aggregate> {
      
              private static final long serialVersionUID = 1;
      
              @SuppressWarnings("rawtypes")
              @Override
              public void invoke(Aggregate value, Context ctx) throws Exception {
              }
          }
      }
      

      Attached is the project that can be executed with ./gradlew run showing the problem, or you can run the attached flink-bug-dist.zip which is prepackaged with the dependencies.

      Thanks in advance

      Attachments

        1. flink-bug-src.zip
          59 kB
          Kirill Vainer
        2. flink-bug-dist.zip
          40.80 MB
          Kirill Vainer

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kvainer Kirill Vainer
              Votes:
              4 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: