Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4907

Single commit multiple instant causing parquet files to be in wrong states

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.12.1
    • None

    Description

      Code for reproducing

      Flink SQL Code

      CREATE TABLE input_table (
          `val`               STRING
          ,`event_time`       TIMESTAMP(3)
          ,`partition`        BIGINT
          ,`offset`           BIGINT
      ) WITH (
          'connector' = 'datagen',
          'fields.val.length' = '99999',
          'rows-per-second' = '15000'
      );CREATE TABLE test_hudi
      (
          `val`                 STRING
          ,`event_time`       TIMESTAMP(3)
          ,`partition`        BIGINT
          ,`offset`           BIGINT
          ,`dt`               STRING
          ,`hh`               STRING
      ) PARTITIONED BY (dt, hh)
      WITH (
          'connector' = 'hudi',
          'path' = 'hdfs://jm_tm_sync_error/',
          'table.type' = 'COPY_ON_WRITE',
          'write.operation' = 'insert',
          'hoodie.parquet.small.file.limit' = '104857600',
          'hoodie.parquet.max.file.size' = '268435456',
          'hoodie.datasource.write.recordkey.field' = 'partition,offset',
          'hoodie.datasource.write.hive_style_partitioning' = 'true',
          'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
          'write.bulk_insert.sort_input' = 'false',
          'index.bootstrap.enabled' = 'false',
          'index.state.ttl' = '60',
          'index.type' = 'FLINK_STATE',
          'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
          'write.tasks' = '8',
          'hive_sync.enable' = 'false'
      );insert into test_hudi
      select  `val`
              ,`event_time`
              ,`partition`
              ,`offset`
              ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
              ,DATE_FORMAT(event_time, 'HH')
       from input_table; 

       

      Advanced Properties

      execution.checkpointing.interval=60000ms 

       

      Job Profile Properties

      flink.version=1.13.14
      default.parallelism=8
      restart.from.savepoint=true
      sql.job.mode=normal
      running.mode=streaming
      slots.per.tm=2
      cpu.per.tm=2vcore
      memory.per.tm=6G
      jvm.heap.ratio=70% 

       

       

      Issues

      There are 2 main issues here:

      1. TM failing + starting a TM in a new container causing deadlock
      2. Single commit multi-instant causing various base file parquet errors
        • java.io.FileNotFoundException: File does not exist: 20220712165157207.parquet (inode 1234567890) Holder DFSClient_NONMAPREDUCE_1111111111_22 does not have any open files.
        • java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
        • java.lang.RuntimeException: 20220805210423582.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [1, 0, -38, 2] 

       

      TM failing + starting a TM in a new container causing deadlock 

      1. When a TM fails + starting and restoring a TM in a new container creates a deadlock situation
        • TM is waiting for JM to create a new INFLIGHT instant, and the
        • JM is waiting for TM to send a success WriteMetadataEvent
      2. The deadlock above will cause either of the errors below:
        • org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize
        • org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
      3. This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
      4. JM will try to restore itself from the last successful checkpoint
      5. This will cause the next issue (illustrated below)

      Root cause

      When restoring the TM, `AbstractStreamWriteFunction#initializeState()` will attempt to restore the state of the TM. At this stage, `this.currentInstant` will be initialized by invoking `lastPendingInstant()`, in which the ckp metadata path will be loaded and a INFLIGHT instant is returned.

       

      When invoking `instantToWrite()`, `instant.equals(this.currentInstant)` will always be true as the local `instant` is equal to `this.currentInstant`. Hence, the current implementation will be stuck in an infinite loop as `lastPendingInstant()`, which governs both `instant` and `this.currentInstant` will always return the same value as the state of the ckp metadata path is never changed. 

       

      This is so as JM is waiting for the TM to finish writing for the batch for the INFLIGHT instant. At the same time TM is waiting for JM to create a new INFLIGHT instant, hence the deadlock. 

       

      The fix is to add additional logic to handle such a case to ensure that a TM can obtain a correct INFLIGHT instant when being recovering.

       

      Single commit multi-instant

      1. JM restore state from previously successful checkpoint
      2. Ckp metadata path is tainted with multiple INFLIGHTs
      3. Synchronisation issue occurs if TM executes `lastPendingInstant()` before JM executes `startInstant()`
      4. Single commit multi instant issue occurs
      5. When tainted TM reads obtained the wrong INFLIGHT instant, it will start a new write cycle with the correct INFLIGHT instant while a checkpoint is being performed.
      6. `reconcileAgainstMarkers()` will delete files that tainted TMs are writing to with the correct INFLIGHT instant in the next cycle, causing FileNotFoundException, COLUMN state errors and parquet corruption errors.

       

      This is caused by the synchronisation issue due to Task Manager (TM) running `ckpMetaData#lastPendingInstant()` before the Job Manager (JM) executes `StreamWriteOperatorCoordinator#startInstant()`, causing the TM to fetch an old instantTime. 

       

      Since ABORTED statuses are not written to the metadata path, TM will fetch a previously aborted instant, thinking that it is still INFLIGHT.

       

      Hence, the new commit will have files of 2 instants should there be a restart AND the timeline contains a INCOMPLETE instant (ABORTED/INFLIGHT).

       

      Please refer to the example below if a RESTART + job restore is triggered, causing a tainted ckp metadata path to be produced. A tainted ckp metadata path is a path in which there are more than 1 INFLIGHT files.

       

      CORRECT Instant fetched after JM restores from checkpoint

      After the job restarts, commit `20220828165937711` is created, and has successfully completed.

       

      If TM invokes `ckpMetaData#lastPendingInstant()` AFTER JM runs `StreamWriteOperatorCoordinator#startInstant()`, it will read the NEW instant on the .aux hdfs path.

       

      `StreamWriteOperatorCoordinator#startInstant()` will create the instant `20220828170053510`, causing there to be 2 INFLIGHT commits. Since the most recent INCOMPLETE instant is fetched, the correct instant, which is `20220828170053510` will be returned.

       

      The .aux hdfs path will look like this when `ckpMetaData#lastPendingInstant()` is invoked.

       

      [
        Ckp{instant='20220828164426755', state='INFLIGHT'}, 
        Ckp{instant='20220828165937711', state='COMPLETED'}, 
        Ckp{instant='20220828170053510', state='INFLIGHT'}
      ] 

       

       

      INCORRECT Instant fetched after JM restores from checkpoint

      If the TM invokes `ckpMetaData#lastPendingInstant()` BEFORE JM runs `StreamWriteOperatorCoordinator#startInstant()`, it will read the OLD instant on the .aux hdfs path.

      The .aux hdfs path will look like this when `ckpMetaData#lastPendingInstant()` is invoked.

       

      [
        Ckp{instant='20220828164123688', state='COMPLETED'}, 
        Ckp{instant='20220828164426755', state='INFLIGHT'}, 
        Ckp{instant='20220828165937711', state='COMPLETED'}
      ] 

       

      Since the new instant file has not been created yet, the most INCOMPLETE is fetched, which is `20220828164426755`.

      In such a case, when there is a possibility that different TMs can obtain different instants time from `ckpMetaData#lastPendingInstant()`, a single commit might contain multiple instants.

       

      FileNotFoundException and various parquet corruption errors

      Building upon the example in Single commit multiple instant error, when `AppendWriteFunction#flushData()` is invoked, the current `writerHelper` will be cleaned up by closing all existing file handles. The `writerHelper` will then be set to NULL

      At this point, Hudi is performing a checkpoint and is about to write to Hudi's timeline by creating `.commit`* file with the commit's metadata.

      Suppose that a TM (Let's call this the tainted TM) is using the an old instant in the tainted ckp metadata path as such: (`20220828164426755` will be used)

      [
        Ckp{instant='20220828164123688', state='COMPLETED'}, 
        Ckp{instant='20220828164426755', state='INFLIGHT'}, 
        Ckp{instant='20220828165937711', state='COMPLETED'},
        Ckp{instant='20220828170053510', state='INFLIGHT'}
      ] 

       

      Once `AppendWriteFunction#flushData()` has completed, on the next invocation of `AppendWriteFunction#processElement()`, `AppendWriteFunction#initWriterHelper()` will be invoked, causing a new writerHelper with the instant `20220828170053510` to be created, and writing will begin to files of this instant.

       

      While all this is happening, Flink is performing a checkpoint and is writing to Hudi's timeline by creating a `.commit`* file with the commit's metadata.

      Before it can create and write to the `.commit`, *`HoodieTable#finalizeWrite()` will be invoked. 

       

      Subsequently, `HoodieTable#reconcileAgainstMarkers()` is invoked. What this method does is to cleanup partially written data-files due to failed (but succeeded on retry) tasks. (SUPPOSEDLY)

      Since the tainted TM is using the instant that is being committed to the Hudi timeline, the files that it are currently being written to will be cleaned up as the marker files can be found, but data files are not found in the `HoodieWriteStat`

       

      In essence, the tainted TM is starting a write cycle when it should not be doing so with an instant that is being committed Hudi timeline, hence, causing files to be deleted/corrupted/being in the wrong state.

       

      Hence, when the JM is performing a checkpoint + commit, the tainted TM might try to close the file handle if it has reached the `parquet.max.filesize`. When trying to close the file handle, this file might have already been deleted by `HoodieTable#reconcileAgainstMarkers()`, causing the `java.io.FileNotFoundException`.

       

      Single commit multi-instant

      1. JM restore state from previously successful checkpoint
      2. Ckp metadata path is tainted with multiple INFLIGHTs
      3. Synchronisation issue occurs if TM executes `lastPendingInstant()` before JM executes `startInstant()`
      4. Single commit multi instant issue occurs
      5. When tainted TM reads obtained the wrong INFLIGHT instant, it will start a new write cycle with the correct INFLIGHT instant while a checkpoint is being performed.
      6. `reconcileAgainstMarkers()` will delete files that tainted TMs are writing to with the correct INFLIGHT instant in the next cycle, causing FileNotFoundException, COLUMN state errors and parquet corruption errors.

       

      This is caused by the synchronisation issue due to Task Manager (TM) running `ckpMetaData#lastPendingInstant()` before the Job Manager (JM) executes `StreamWriteOperatorCoordinator#startInstant()`, causing the TM to fetch an old instantTime. 

       

      Since ABORTED statuses are not written to the metadata path, TM will fetch a previously aborted instant, thinking that it is still INFLIGHT.

      Hence, the new commit will have files of 2 instants should there be a restart AND the timeline contains a INCOMPLETE instant (ABORTED/INFLIGHT).

      Please refer to the example below if a RESTART + job restore is triggered, causing a tainted ckp metadata path to be produced. A tainted ckp metadata path is a path in which there are more than 1 INFLIGHT files.

       

      NOTE: Although the check with  `instant.equals(this.currentInstant) && hasData` is sufficient in ensuring that the first instant fetched from the ckpMetadataPath is correct, it is insufficient in preventing previous INFLIGHT instants from being fetched after this.currentInstant is updated the first successful checkpoint after recovering from a global failover in flushData(). As such, first instant commit after failover will always be correct, but this does not guarantee that subsequent instants will be fetched correctly, as the timeline is still tainted with an old INFLIGHT instant. Please refer to the subsequent sections for a concrete example.

       

      CORRECT Instant fetched after JM restores from checkpoint

      After the job restarts, commit `20220828165937711` is created, and has successfully completed.

       

      If TM invokes `ckpMetaData#lastPendingInstant()` AFTER JM runs `StreamWriteOperatorCoordinator#startInstant()`, it will read the NEW instant on the .aux hdfs path.

       

      `StreamWriteOperatorCoordinator#startInstant()` will create the instant `20220828170053510`, causing there to be 2 INFLIGHT commits. Since the most recent INCOMPLETE instant is fetched, the correct instant, which is `20220828170053510` will be returned.

       

      The .aux hdfs path will look like this when `ckpMetaData#lastPendingInstant()` is invoked.

       

      [
        Ckp{instant='20220828164426755', state='INFLIGHT'}, 
        Ckp{instant='20220828165937711', state='COMPLETED'}, 
        Ckp{instant='20220828170053510', state='INFLIGHT'}
      ] 

       

       

      INCORRECT Instant fetched after JM restores from checkpoint

      If the TM invokes `ckpMetaData#lastPendingInstant()` BEFORE JM runs `StreamWriteOperatorCoordinator#startInstant()`, it will read the OLD instant on the .aux hdfs path.

      The .aux hdfs path will look like this when `ckpMetaData#lastPendingInstant()` is invoked.

       

      [
        Ckp{instant='20220828164123688', state='COMPLETED'}, 
        Ckp{instant='20220828164426755', state='INFLIGHT'}, 
        Ckp{instant='20220828165937711', state='COMPLETED'}
      ] 

       

      Since the new instant file has not been created yet, the most INCOMPLETE is fetched, which is `20220828164426755`.

      In such a case, when there is a possibility that different TMs can obtain different instants time from `ckpMetaData#lastPendingInstant()`, a single commit might contain multiple instants.

       

      FileNotFoundException and various parquet corruption errors

      Building upon the example in Single commit multiple instant error, when `AppendWriteFunction#flushData()` is invoked, the current `writerHelper` will be cleaned up by closing all existing file handles. The `writerHelper` will then be set to NULL

      At this point, Hudi is performing a checkpoint and is about to write to Hudi's timeline by creating `.commit`* file with the commit's metadata.

      Suppose that a TM (Let's call this the tainted TM) is using the an old instant in the tainted ckp metadata path as such: (`20220828164426755` will be used)

      [
        Ckp{instant='20220828164123688', state='COMPLETED'}, 
        Ckp{instant='20220828164426755', state='INFLIGHT'}, 
        Ckp{instant='20220828165937711', state='COMPLETED'},
        Ckp{instant='20220828170053510', state='INFLIGHT'}
      ] 

       

      Once `AppendWriteFunction#flushData()` has completed, on the next invocation of `AppendWriteFunction#processElement()`, `AppendWriteFunction#initWriterHelper()` will be invoked, causing a new writerHelper with the instant `20220828170053510` to be created, and writing will begin to files of this instant.

       

      While all this is happening, Flink is performing a checkpoint and is writing to Hudi's timeline by creating a `.commit`* file with the commit's metadata.

      Before it can create and write to the `.commit`, *`HoodieTable#finalizeWrite()` will be invoked. 

       

      Subsequently, `HoodieTable#reconcileAgainstMarkers()` is invoked. What this method does is to cleanup partially written data-files due to failed (but succeeded on retry) tasks. (SUPPOSEDLY)

      Since the tainted TM is using the instant that is being committed to the Hudi timeline, the files that it are currently being written to will be cleaned up as the marker files can be found, but data files are not found in the `HoodieWriteStat`

       

      In essence, the tainted TM is starting a write cycle when it should not be doing so with an instant that is being committed Hudi timeline, hence, causing files to be deleted/corrupted/being in the wrong state.

       

      Hence, when the JM is performing a checkpoint + commit, the tainted TM might try to close the file handle if it has reached the `parquet.max.filesize`. When trying to close the file handle, this file might have already been deleted by `HoodieTable#reconcileAgainstMarkers()`, causing the `java.io.FileNotFoundException`.

       

      Logs:

      Refer to comments 

      Fix:

      Ensure that the instant fetched from ckpMetaPath is always larger than this.currentInstant.

      Attachments

        1. error.log
          10 kB
          Teng Huo
        2. correct.log
          10 kB
          Teng Huo

        Issue Links

          Activity

            People

              voonhous voon
              voonhous voon
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: