Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10674

Fix handling of retractions after clean up

    XMLWordPrintableJSON

    Details

      Description

      Our online Flink Job run about a week,job contain sql :

      select  `time`,  
              lower(trim(os_type)) as os_type, 
              count(distinct feed_id) as feed_total_view  
      from  my_table 
      group by `time`, lower(trim(os_type))

       

        then occur NPE: 

       

      java.lang.NullPointerException
      
      at scala.Predef$.Long2long(Predef.scala:363)
      
      at org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
      
      at NonWindowedAggregationHelper$894.retract(Unknown Source)
      
      at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
      
      at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
      
      at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
      
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
      
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
      
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
      
      at java.lang.Thread.run(Thread.java:745)
      

       

       

      View DistinctAccumulator.remove

       

      this NPE should currentCnt = null lead to, so we simple handle like :

      def remove(params: Row): Boolean = {
        if(!distinctValueMap.contains(params)){
          true
        }else{
          val currentCnt = distinctValueMap.get(params)
          // 
          if (currentCnt == null || currentCnt == 1) {
            distinctValueMap.remove(params)
            true
          } else {
            var value = currentCnt - 1L
            if(value < 0){
              value = 1
            }
            distinctValueMap.put(params, value)
            false
          }
        }
      }

       

      Update:

      Because state clean up happens in processing time, it might be
      the case that retractions are arriving after the state has
      been cleaned up. Before these changes, a new accumulator was
      created and invalid retraction messages were emitted. This
      change drops retraction messages for which no accumulator
      exists. 

        Attachments

          Activity

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              ambition ambition
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: