Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22504

Optimization in overwrite table in case of failure

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • SQL

    Description

      Optimization in overwrite table in case of failure

      1. SCENARIO

      Currently, `Overwrite` operation in spark is performed by following steps:
      1. DROP : drop old table
      2. WRITE: create and write data into new table

      If some runtime error occurs in Step2, then the origin table will be lost along with its data – I think this will be a serious problem if someone perform `read-update-flushback` actions. The problem can be reproduced by the following code:

      ```scala

      01: test("test spark df overwrite failed") {
      02: // prepare table
      03: val tableName = "test_spark_overwrite_failed"
      04: sql(s"DROP TABLE IF EXISTS $tableName")
      05: sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, field_string String)" +
      06: s" STORED AS parquet").collect()
      07:
      08: // load data first
      09: val schema = StructType(
      10: Seq(StructField("field_int", DataTypes.IntegerType, nullable = false),
      11: StructField("field_string", DataTypes.StringType, nullable = false)))
      12: val rdd1 = sqlContext.sparkContext.parallelize(
      13: Row(20, "q") ::
      14: Row(21, "qw") ::
      15: Row(23, "qwe") :: Nil)
      16: val dataFrame = sqlContext.createDataFrame(rdd1, schema)
      17: dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
      18: sql(s"SELECT * FROM $tableName").show()
      19:
      20: // load data again, the following data will cause failure in data loading
      21: try

      { 22: val rdd2 = sqlContext.sparkContext.parallelize( 23: Row(31, "qwer") :: 24: Row(null, "qwer") :: 25: Row(32, "long_than_5") :: Nil) 26: val dataFrame2 = sqlContext.createDataFrame(rdd2, schema) 27: 28: dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName) 29: }

      catch

      { 30: case e: Exception => LOGGER.error(e, "write overwrite failure") 31: }

      32: // table `test_spark_overwrite_failed` has been dropped
      33: sql(s"show tables").show(20, truncate = false)
      34: // the content is empty even if table exists. We want it to be the same as
      35: sql(s"SELECT * FROM $tableName").show()
      36: }

      ```
      In Line24, we creata a `null` element while the schema is `notnull` – This will cause runtime error in loading data.
      In Line33, table `test_spark_overwrite_failed` has already been dropped and no longger exists in the current table. And of course Line35 will fail.
      Instead, we want Line35 to show the origin data just as Line18.

      1. ANALYZE

      I am thinking of optimizing `overwrite` in spark – The goal is to keep the old data until the load has finished successfully. The old data can only be cleaned when the load is successful.

      Since sparksql already support `rename` operation, we can optimize `overwrite` in the following steps:
      1. WRITE: create and write data to tempTable
      2. SWAP: swap temptable1 with targetTable by using rename operation
      3. CLEAN: clean up old data

      If step1 works fine, then swap tempTable with targetTable and clean up old data; otherwise, keep the target table not changed.

      Attachments

        Activity

          People

            Unassigned Unassigned
            xuchuanyin Chuanyin Xu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: