Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-6758

Avoid duplicated log blocks on the LogRecordReader

    XMLWordPrintableJSON

Details

    Description

      Due to spark retries, we could have duplicated log blocks added during write. And since, we don't delete anything during marker based reconciliation on the writer side, the reader could see duplicated log blocks. for most of the payload implementation, this should not be an issue. But for expression payload, it could result in data consistency since an expression could be evaluated twice (for eg, colA*2).

       

      Here is the proposal for the fix:

       
      With spark retries, there could be duplicate log blocks created. So, we are going to might need to add some kind of block identifier to every block added. Already we have a header with key "INSTANT_TIME". In addition we can add "BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1 and keep increasing during rollover for a given a file slice(append handle). On spark retries, again, we start with sequence no of 1 for a given File slice. Because, for a given commit and for a given file slice, with spark task or stage retries, entire file slice will be attempted again. so, its safe to just go with a simple sequence number and don't really need to rely on taskId etc to come up with block identifiers. For eg, if a file slice is supposed to get 3 log files due to log file size rollovers, and during first attempt only 2 log files were created, in 2nd attempt, all 3 log files will have to re-created.
       
      On the reader side, if there are duplicate blocks detected with same commit time and same block sequence number, we should find the longest sequence and pick them if more than one sequence is found. And corrupt blocks should be ignored during this reconciliation as well. We should account for backwards compatability where the block seq number may not be present. For rollback blocks, we can statically set the sequence no to 1 since there should be only one log block per file slice.
       

      Diff scenarios: 
      As called out above, we will go with a simple Block_Sequence_No which starts with 0 and increments for a given file slice. Major crux here is, how the reader will reconcile if there are duplicate log files created due to spark task failures. High level logic to reconcile for the reader is as follows: Reader find the longest sequence of BSN(block sequence number) and picks the maximum one. So, we do one pass over every log block (which we already do as of today) to parse header info, w/o deserializing actual content and determine the right ones (and ignore the spurious log blocks)
       

      Scenario 1:

      Happy path. no retries.
      log_file1(BSN : 0), log_file2(BSN:1) added.
      Reader:
      finds only one sequence of 0,1 for BSN and marks both log_file1 and log_file2 as valid.
       

      Scenario 2:

      Task failed and retired, where 1st attempt did not create entire list of log files.
      attempt1:
      log_file1(BSN : 0), log_file2(BSN:1) added.
      attempt2:
      log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts with 0 everytime for a given file slice.
       
      Reader:
      Finds two sequence of Block sequence nos.
      (0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1 and lf2 as spurious ones.
       

      Scenario 3:

      Task failed and retired, where both attempts created full list. (spark speculation may be)
      attempt1:
      log_file1(BSN : 0), log_file2(BSN:1) added.
      attempt2:
      log_file3(BSN : 0), log_file4(BSN:1) added.
       
      Reader:
      Finds two sequence of Block sequence nos.
      (0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2 as spurious ones. We will probably pick the latest set if both sets have same sequence.
       

      Scenario 4:

      Task failed and retired, where 1st attempt has full list of log files.
      attempt1:
      log_file1(BSN : 0), log_file2(BSN:1) added.
      attempt2:
      log_file3(BSN : 0) added.
       
      Reader:
      Finds two sequence of Block sequence nos.
      (0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as spurious one.
       
      Same logic should work out for hdfs as well. Since log blocks are ordered as per log file name w/ versions, the ordering should be intact. i.e. log files/log blocks created by 2nd attempt should not interleave w/ log blocks during w/ 1st attempt. If log blocks are within same log file, the log blocks will be ordered for sure. If its across log files, the log version number ordering should suffice (which already happens).
       
       
      Scenario 3 with hdfs:
      Task failed and retired, where both attempts created full list. (spark speculation may be). but in one of the attempt, there are partial writes(corrupted blocks).
      attempt1:
      log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block due to partial write.
      attempt2:
      log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
       
      Reader:
      Finds two sequence of Block sequence nos.
      Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and (0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1, block1 and lf1, block2 as invalid.
       

       

      Attachments

        Activity

          People

            shivnarayan sivabalan narayanan
            shivnarayan sivabalan narayanan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: