Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-739

HoodieIOException: Could not delete in-flight instant

    XMLWordPrintableJSON

Details

    Description

      We are evaluating Hudi to use for our near real-time ingestion needs, compared to other solutions (Delta/Iceberg). We've picked Hudi because pre-installed with Amazon EMR by AWS. However, adopting it is blocking on this issue with concurrent small batch (of 256 files) write jobs (to the same S3 path).

      Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. We're writing Spark SQL datasets promoted up from RDDs. The "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. Hoodie version is 0.5.0-incubating.

      Both on COW and MOR tables some of the submitted jobs are failing with the below exception:

      org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant [==>20200326175252__deltacommit__INFLIGHT]
      	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
      	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
      	at org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
      	at org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
      	at org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
      	at org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
      	at org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
      	at org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
      	at org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
      	at org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
      	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
      	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
      	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
      	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
      	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
      	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
      	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
      	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
      	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
      

      The jobs are sent in concurrent batches of 256 files, over the same S3 path, in total some 8k files for 6 hours of our data.

      Writing happens with the following code (basePath is an S3 bucket):

      // Configs (edited)
      String databaseName = "nrt";
      String assumeYmdPartitions = "false";
      String extractorClass = MultiPartKeysValueExtractor.class.getName ();
      String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
      String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
      String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
      String basePath = "s3://some_path_to_hudi"; // or "s3a://" does not seem to matter, same exception
      String avroSchemaAsString = avroSchema.toString ();
      String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");
      
      eventsDataset.write ()
          .format ("org.apache.hudi")
          .option (HoodieWriteConfig.TABLE_NAME, tableName)
          .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
          .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
          .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), "partition_path")
          .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
          .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
          .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
          .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
          .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), "tenant,year,month,day")
          .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
          .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), assumeYmdPartitions)
          .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), extractorClass)
          .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
      .mode (SaveMode.Append)
      .save (String.format ("%s/%s", basePath, tableName));
      

      Attachments

        Activity

          People

            shivnarayan sivabalan narayanan
            Antauri Catalin Alexandru Zamfir
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: