Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
1.6.1
-
Flink 1.6.0
Description
Our online Flink Job run about a week,job contain sql :
select `time`, lower(trim(os_type)) as os_type, count(distinct feed_id) as feed_total_view from my_table group by `time`, lower(trim(os_type))
then occur NPE:
java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109) at NonWindowedAggregationHelper$894.retract(Unknown Source) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)
View DistinctAccumulator.remove
this NPE should currentCnt = null lead to, so we simple handle like :
def remove(params: Row): Boolean = { if(!distinctValueMap.contains(params)){ true }else{ val currentCnt = distinctValueMap.get(params) // if (currentCnt == null || currentCnt == 1) { distinctValueMap.remove(params) true } else { var value = currentCnt - 1L if(value < 0){ value = 1 } distinctValueMap.put(params, value) false } } }
Update:
Because state clean up happens in processing time, it might be
the case that retractions are arriving after the state has
been cleaned up. Before these changes, a new accumulator was
created and invalid retraction messages were emitted. This
change drops retraction messages for which no accumulator
exists.