Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18332

Add error message to precondition in KeyGroupPartitionedPriorityQueue

    XMLWordPrintableJSON

Details

    Description

      in my case, the user custom a KeySelector and use a static SimpleDateFormat to format unix timestamp. sometimes job will throw an ArrayIndexOutOfBoundsException

      java.lang.ArrayIndexOutOfBoundsException: -49
      	at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:174)
      	at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:110)
      	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:203)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:901)
      	at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
      	at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:920)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:402)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
      	at java.lang.Thread.run(Thread.java:745)
      

      I reproduced this case.
      Because keySelector.getKey() will be called twice on the same record, and SimpleDateFormat is not thread safe, In the case of high concurrency and Cross Days, the results returned by the two calls of keySelector.getKey() may be different.
      So the keygroup calculated in the second execution is different from the result of the first calculation,then throw an ArrayIndexOutOfBoundsException.

      I think the error message should be clearer, not just the ArrayIndexOutOfBoundsException.

      Attachments

        Issue Links

          Activity

            People

              tartarus tartarus
              tartarus tartarus
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: