Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4741

Deadlock when restarting failed TM in AbstractStreamWriteFunction

    XMLWordPrintableJSON

Details

    Description

      Summary of Events

      1. TM heartbeat not sent to JM (Can be triggered by killing a container), JM kills the TM/container
      2. JM restarts the container, but the restarting code is not handled properly, causing there to be a deadlock
      3. Deadlock causes instantToWrite() to loop for 10 minutes (default Flink checkpoint timeout), causing a instant initialization timeout error
      4. JM is restarted
      5. JM restore state from previously successful checkpoint
      6. Issue in HUDI-4907 occurs

       

      Code for reproducing

      Flink SQL Code

      CREATE TABLE input_table (
          `val`               STRING
          ,`event_time`       TIMESTAMP(3)
          ,`partition`        BIGINT
          ,`offset`           BIGINT
      ) WITH (
          'connector' = 'datagen',
          'fields.val.length' = '99999',
          'rows-per-second' = '15000'
      );CREATE TABLE test_hudi
      (
          `val`                 STRING
          ,`event_time`       TIMESTAMP(3)
          ,`partition`        BIGINT
          ,`offset`           BIGINT
          ,`dt`               STRING
          ,`hh`               STRING
      ) PARTITIONED BY (dt, hh)
      WITH (
          'connector' = 'hudi',
          'path' = 'hdfs://jm_tm_sync_error/',
          'table.type' = 'COPY_ON_WRITE',
          'write.operation' = 'insert',
          'hoodie.parquet.small.file.limit' = '104857600',
          'hoodie.parquet.max.file.size' = '268435456',
          'hoodie.datasource.write.recordkey.field' = 'partition,offset',
          'hoodie.datasource.write.hive_style_partitioning' = 'true',
          'hoodie.datasource.write.partitionpath.field' = 'dt,hh',
          'write.bulk_insert.sort_input' = 'false',
          'index.bootstrap.enabled' = 'false',
          'index.state.ttl' = '60',
          'index.type' = 'FLINK_STATE',
          'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
          'write.tasks' = '8',
          'hive_sync.enable' = 'false'
      );insert into test_hudi
      select  `val`
              ,`event_time`
              ,`partition`
              ,`offset`
              ,DATE_FORMAT(event_time, 'yyyy-MM-dd')
              ,DATE_FORMAT(event_time, 'HH')
       from input_table; 

       

      Advanced Properties

      execution.checkpointing.interval=60000ms 

       

      Job Profile Properties

      flink.version=1.13.14
      default.parallelism=8
      restart.from.savepoint=true
      sql.job.mode=normal
      running.mode=streaming
      slots.per.tm=2
      cpu.per.tm=2vcore
      memory.per.tm=6G
      jvm.heap.ratio=70% 

       

       

      Issues: TM failing + starting a TM in a new container causing deadlock 

      1. When a TM fails + starting and restoring a TM in a new container creates a deadlock situation
        • TM is waiting for JM to create a new INFLIGHT instant, and the
        • JM is waiting for TM to send a success WriteMetadataEvent
      2. The deadlock above will cause either of the errors below:
        • org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize
        • org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
      3. This will trigger org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
      4. JM will try to restore itself from the last successful checkpoint
      5. This will cause HUDI-4907

      Root cause

      When restoring the TM, `AbstractStreamWriteFunction#initializeState()` will attempt to restore the state of the TM. At this stage, `this.currentInstant` will be initialized by invoking `lastPendingInstant()`, in which the ckp metadata path will be loaded and a INFLIGHT instant is returned.

       

      When invoking `instantToWrite()`, `instant.equals(this.currentInstant)` will always be true as the local `instant` is equal to `this.currentInstant`. Hence, the current implementation will be stuck in an infinite loop as `lastPendingInstant()`, which governs both `instant` and `this.currentInstant` will always return the same value as the state of the ckp metadata path is never changed. 

       

      This is so as JM is waiting for the TM to finish writing for the batch for the INFLIGHT instant. At the same time TM is waiting for JM to create a new INFLIGHT instant, hence the deadlock. 

       

      The short term fix is to enforce global failover every time there is a failure.

      Attachments

        Activity

          People

            voonhous voon
            voonhous voon
            Jian Feng, Teng Huo
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: