Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17453

KyroSerializer throws IndexOutOfBoundsException type java.util.PriorityQueue

    XMLWordPrintableJSON

Details

    Description

      We're using SQL UDAF with a PriorityQueue as Accumulator, and when recovering from checkpoint, the error occurs.

      2020-04-28 22:28:18,659 INFO  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer  - IndexOutOfBoundsException type java.util.PriorityQueue source data is: [2, 0, 0, 0, 0, 0, 0, 0].
      2020-04-28 22:28:18,660 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally GroupWindowAggregate -> Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0 (4636858426452f0a437d2f6d9564f34d).
      2020-04-28 22:28:18,660 INFO  org.apache.flink.runtime.taskmanager.Task                     - GroupWindowAggregate -> Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0 (4636858426452f0a437d2f6d9564f34d) switched from RUNNING to FAILED.
      org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
              at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:967)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:941)
              at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: TimerException{com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.}
              at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
              ... 7 more
      Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
              at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
              at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
              at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
              at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
              at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:361)
              at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
              at org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86)
              at org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628)
              at org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633)
              at org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320)
              at org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293)
              at org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257)
              at org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302)
              at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source)
              at org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction.getWindowAggregationResult(GeneralWindowProcessFunction.java:73)
              at org.apache.flink.table.runtime.operators.window.WindowOperator.emitWindowResult(WindowOperator.java:434)
              at org.apache.flink.table.runtime.operators.window.WindowOperator.onProcessingTime(WindowOperator.java:422)
              at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
              at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
              ... 7 more
      Caused by: java.io.EOFException: No more bytes left.
              ... 26 more
      

      The problem happens when restoring from checkpoint. (I don't know what's inside the PriorityQueue because it's already on production environment). According to the logs, it seems that it's because the KryoSerializer cannot successfully deserialize the PriorityQueue instance and throws an IndexOutOfBoundsException.

      The UDAF accumulator is:

      
      public static class Acc {
      public PriorityQueue<Map.Entry<String, Long>> queue;
      }
      
      @Override
      public Acc createAccumulator() {
      Acc accumulator = new Acc();
      Comparator<Map.Entry<String, Long>> comparator = new Comparator<Map.Entry<String, Long>>() {
          @Override
          public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
              return o1.getValue().compareTo(o2.getValue()) == 0 ? o1.getKey().compareTo(o2.getKey()) :
                      o1.getValue().compareTo(o2.getValue());
          }
      };
      PriorityQueue<Map.Entry<String, Long>> pq = new PriorityQueue<>(comparator);
      
      accumulator.queue = pq;
      return accumulator;
      }
      

      Attachments

        1. udaf
          3 kB
          Jiayi Liao

        Activity

          People

            Unassigned Unassigned
            wind_ljy Jiayi Liao
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: