Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10674

Fix handling of retractions after clean up

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersStop watchingWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Our online Flink Job run about a week,job contain sql :

      select  `time`,  
              lower(trim(os_type)) as os_type, 
              count(distinct feed_id) as feed_total_view  
      from  my_table 
      group by `time`, lower(trim(os_type))

       

        then occur NPE: 

       

      java.lang.NullPointerException
      
      at scala.Predef$.Long2long(Predef.scala:363)
      
      at org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
      
      at NonWindowedAggregationHelper$894.retract(Unknown Source)
      
      at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
      
      at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
      
      at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
      
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
      
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
      
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
      
      at java.lang.Thread.run(Thread.java:745)
      

       

       

      View DistinctAccumulator.remove

       

      this NPE should currentCnt = null lead to, so we simple handle like :

      def remove(params: Row): Boolean = {
        if(!distinctValueMap.contains(params)){
          true
        }else{
          val currentCnt = distinctValueMap.get(params)
          // 
          if (currentCnt == null || currentCnt == 1) {
            distinctValueMap.remove(params)
            true
          } else {
            var value = currentCnt - 1L
            if(value < 0){
              value = 1
            }
            distinctValueMap.put(params, value)
            false
          }
        }
      }

       

      Update:

      Because state clean up happens in processing time, it might be
      the case that retractions are arriving after the state has
      been cleaned up. Before these changes, a new accumulator was
      created and invalid retraction messages were emitted. This
      change drops retraction messages for which no accumulator
      exists. 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            twalthr Timo Walther
            ambition ambition
            Votes:
            0 Vote for this issue
            Watchers:
            7 Stop watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment