Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Invalid
-
1.9.0
-
None
Description
We're using SQL UDAF with a PriorityQueue as Accumulator, and when recovering from checkpoint, the error occurs.
2020-04-28 22:28:18,659 INFO org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer - IndexOutOfBoundsException type java.util.PriorityQueue source data is: [2, 0, 0, 0, 0, 0, 0, 0]. 2020-04-28 22:28:18,660 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally GroupWindowAggregate -> Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0 (4636858426452f0a437d2f6d9564f34d). 2020-04-28 22:28:18,660 INFO org.apache.flink.runtime.taskmanager.Task - GroupWindowAggregate -> Calc_select_live_id__2 -> SinkConversionToTupl -> Map -> Filter (37/40)- execution # 0 (4636858426452f0a437d2f6d9564f34d) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:967) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:941) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: TimerException{com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) ... 7 more Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:361) at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536) at org.apache.flink.table.dataformat.BinaryGeneric.getJavaObjectFromBinaryGeneric(BinaryGeneric.java:86) at org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:628) at org.apache.flink.table.dataformat.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:633) at org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:320) at org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1293) at org.apache.flink.table.dataformat.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1257) at org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:302) at GroupingWindowAggsHandler$57.setAccumulators(Unknown Source) at org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction.getWindowAggregationResult(GeneralWindowProcessFunction.java:73) at org.apache.flink.table.runtime.operators.window.WindowOperator.emitWindowResult(WindowOperator.java:434) at org.apache.flink.table.runtime.operators.window.WindowOperator.onProcessingTime(WindowOperator.java:422) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more Caused by: java.io.EOFException: No more bytes left. ... 26 more
The problem happens when restoring from checkpoint. (I don't know what's inside the PriorityQueue because it's already on production environment). According to the logs, it seems that it's because the KryoSerializer cannot successfully deserialize the PriorityQueue instance and throws an IndexOutOfBoundsException.
The UDAF accumulator is:
public static class Acc { public PriorityQueue<Map.Entry<String, Long>> queue; } @Override public Acc createAccumulator() { Acc accumulator = new Acc(); Comparator<Map.Entry<String, Long>> comparator = new Comparator<Map.Entry<String, Long>>() { @Override public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { return o1.getValue().compareTo(o2.getValue()) == 0 ? o1.getKey().compareTo(o2.getKey()) : o1.getValue().compareTo(o2.getValue()); } }; PriorityQueue<Map.Entry<String, Long>> pq = new PriorityQueue<>(comparator); accumulator.queue = pq; return accumulator; }