Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.1.0
-
None
Description
Optimization in overwrite table in case of failure
- SCENARIO
Currently, `Overwrite` operation in spark is performed by following steps:
1. DROP : drop old table
2. WRITE: create and write data into new table
If some runtime error occurs in Step2, then the origin table will be lost along with its data – I think this will be a serious problem if someone perform `read-update-flushback` actions. The problem can be reproduced by the following code:
```scala
01: test("test spark df overwrite failed") {
02: // prepare table
03: val tableName = "test_spark_overwrite_failed"
04: sql(s"DROP TABLE IF EXISTS $tableName")
05: sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, field_string String)" +
06: s" STORED AS parquet").collect()
07:
08: // load data first
09: val schema = StructType(
10: Seq(StructField("field_int", DataTypes.IntegerType, nullable = false),
11: StructField("field_string", DataTypes.StringType, nullable = false)))
12: val rdd1 = sqlContext.sparkContext.parallelize(
13: Row(20, "q") ::
14: Row(21, "qw") ::
15: Row(23, "qwe") :: Nil)
16: val dataFrame = sqlContext.createDataFrame(rdd1, schema)
17: dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
18: sql(s"SELECT * FROM $tableName").show()
19:
20: // load data again, the following data will cause failure in data loading
21: try
catch
{ 30: case e: Exception => LOGGER.error(e, "write overwrite failure") 31: }32: // table `test_spark_overwrite_failed` has been dropped
33: sql(s"show tables").show(20, truncate = false)
34: // the content is empty even if table exists. We want it to be the same as
35: sql(s"SELECT * FROM $tableName").show()
36: }
```
In Line24, we creata a `null` element while the schema is `notnull` – This will cause runtime error in loading data.
In Line33, table `test_spark_overwrite_failed` has already been dropped and no longger exists in the current table. And of course Line35 will fail.
Instead, we want Line35 to show the origin data just as Line18.
- ANALYZE
I am thinking of optimizing `overwrite` in spark – The goal is to keep the old data until the load has finished successfully. The old data can only be cleaned when the load is successful.
Since sparksql already support `rename` operation, we can optimize `overwrite` in the following steps:
1. WRITE: create and write data to tempTable
2. SWAP: swap temptable1 with targetTable by using rename operation
3. CLEAN: clean up old data
If step1 works fine, then swap tempTable with targetTable and clean up old data; otherwise, keep the target table not changed.