Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2549

Exceptions when using second writer into Hudi table managed by DeltaStreamer

    XMLWordPrintableJSON

Details

    Description

      When running the DeltaStreamer along with a second spark datasource writer (with ZK-based OCC enabled we receive the following exception (which haults the spark datasource writer). This occurs following warnings of timeline inconsistencies:

       

      21/10/07 17:10:05 INFO TransactionManager: Transaction ending with transaction owner Option{val=[==>20211007170717__commit__INFLIGHT]}
      21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
      21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
      21/10/07 17:10:05 INFO TransactionManager: Transaction ended
      Exception in thread "main" java.lang.IllegalArgumentException
              at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
              at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
              at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
              at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
              at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
              at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
              at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
              at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
              at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
              at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
              at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
              at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
              at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
              at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
              at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
              at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
              at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
              at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
              at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
              at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
              at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
              at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
              at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
              at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
              at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
              at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
              at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
              at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
              at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
              at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
              at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
              at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
              at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
              at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
              at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
      

      The validation at ValidationUtils.checkArgument fails because the expected commit file was not present on DFS.

       
      The test setup is this:

      • Deltastreamer, continuous mode. Small batch sizes and very fast commit times (high commit rate, every 10-30 seconds)
      • A spark datasource writer moving flat parquet files from a source bucket into the table maintained by the deltastreamer
      • The spark datasource is much slower than the deltastreamer so time-to-first-commit is about 2-8 minutes

      What I see happen is the deltastreamer finalizing many commits while the spark datasource is performing its write. It appears that the timeline changes so much, so fast that the spark datasource writer becomes "out of sync" in ways that it cannot recover. I see the Exception in thread "main" java.lang.IllegalArgumentException error on the first commit of the datasource writer every time. This appears to be a race condition when the rate-of-change of the hudi timeline is very high (due to a fast deltastreamer process). The spark datasource does not properly sync those changes in a multi-writer configuration which causes enough inconsistency to crash the job. 

       

      Attachments

        Issue Links

          Activity

            People

              dave_hagman Dave Hagman
              dave_hagman Dave Hagman
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: