Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26408

retract a non-existent record in RetractableTopNFunction

    XMLWordPrintableJSON

Details

    Description

      RetractableTopNFunction will throw a RuntimeException when

      1. the sorted Map ValueState<SortedMap<RowData, Long>> treeMap is not empty.
      2. and the sorted Map doesn't contain current sort key.

      Now we have Flink SQL job:

      // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)
      
      select
      a_key,a_time,a_jk,b_key,b_time,b_jk
      from
      (
          select
          a_key,a_time,a_jk,b_key,b_time,b_jk,
          row_number() over(partition by a_key order by a_time desc) as rn
          from
          (
              select a_key, a_time, a_jk
              from (
                  select * , row_number() over(partition by a_key order by a_time desc) as rn
                  from table_a
              ) tmp1
              where rn = 1
          ) t1
          left join
          (
              select b_key, b_time, b_jk
              from (
                  select * , row_number() over(partition by b_key order by b_time desc) as rn
                  from table_b
              ) tmp2
              where rn = 1
          ) t2
          on t1.a_jk = t2.b_jk
      ) t3
      where rn = 1

      the JobGraph is like:

      Source table_a  —>  Rank_a

                                                                     —>  Join  —> Final Rank                      

      Source table_b —>   Rank_b

      Suppose we hava following input:

      ts SourceA
      (a_key, a_time,a_jk)
      SourceB
      (b_key,b_time,b_jk)
      RankA
      (a_key, a_time,a_jk)
      RankB
      (b_key,b_time,b_jk)
      Join
      (a_key,b_key,a_time, a_jk)
      Final Rank
      (a_key,b_key,a_time)
      t1   +(b1,1,jk1)   +(b1,1,jk1)    
      t2   +(b2,2,jk2)   +(b2,2,jk2)    
      t3 +(a1,3,jk1)   +(a1,3,jk1)   +(a1,b1,3,jk1) +(a1,b1,3)
      t4 +(a1,4,jk1)   -(a1,3,jk1)
      +(a1,4,jk1)
        -(a1,b1,3,jk1)
      +(a1,b1,4,jk1)
      -(a1,b1,3)
      +(a1,b1,4)
      t5 +(a1,5,jk2)   -(a1,4,jk1)
      +(a1,5,jk2)
        -(a1,b1,4,jk1)
      +(a1,b2,5,jk2)
      -(a1,b1,4)
      +(a1,b2,5)
                   

       

      Assume:

      1. t4&t5 is almost at the same time, the Join Operator produce 4 message at t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) (which hashed with jk2) may runs on different task from other 3 messages(hashed with jk1), and it may arrive Final Rank earlier than them.
      2. Due to network congestion or high machine load, etc. the messages produced at t4&t5 on Join Operator take a while before they arrive Final Rank, when Final Rank received them, the state is expired because of State TTL, the treeMap state is cleared.

      Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it will find that the sortedMap is not empty, and it doesn't contains sort key value 3. meet the conditions for that Runtime Exception.

      we met this exception in our production environment (Flink verision 1.12.2), it's very serious because when it happens, the job can not recover automatically as the state is polluted.

      Attachments

        Activity

          People

            Unassigned Unassigned
            hengyu.dai Hengyu Dai
            Votes:
            0 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated: