Details
-
Question
-
Status: Resolved
-
Minor
-
Resolution: Invalid
-
2.1.0
-
None
-
None
-
jdk1.7.0_67
spark-hive_2.11
Description
I occur an issue about spark-sql. When obtaining a Dataset through some logics, I wish to persist this Dataset as it would be used many times in the future. However, when persisting it, the logic would be calculated twice. Therefore I make some local test to reproduce this issue, and it happens.
I test in three filter function, and found that,
.filter(col("id") > 10) //expect
.filter(length(col("name")) > 4) //expect
.filter(size(col("seq_name")) > 1) //unexpect if filter exist
i.e., the twice calculation issue occurs when filter out result exists.
result image
expected
unexpected
reproduce code
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} object TwiceCalculationReproducer { private val separator = scala.reflect.io.File.separator private val dirName = new java.io.File("").getAbsolutePath + separator + "testOutput" System.setProperty("spark.app.name", "TestController") System.setProperty("spark.master", "local[2]") private val ss = SparkSession.builder().config(new SparkConf()).enableHiveSupport().getOrCreate() ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///") private val sc = ss.sparkContext def main(args: Array[String]) { val fs = FileSystem.get(sc.hadoopConfiguration) fs.delete(new Path(dirName), true) Thread.sleep(1000) /*expected*/ val tableRaw = Dims.df val tableNewExp = seqColumnGeneratorExcepted(tableRaw) tableNewExp.persist() tableNewExp.show(10, 100) /*unexpected*/ Thread.sleep(1000) val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw) tableNewExp.persist() tableNewUnexp.show(10, 100) } /*normal*/ def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = { ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name"))) } /*abnormal*/ def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = { seqColumnGeneratorExcepted(ds) .filter(col("id") > 10) //expect .filter(length(col("name")) > 4) //expect .filter(size(col("seq_name")) > 1) //unexpect if filter exist } /*validator udf*/ def seqTokenUdf = udf { (id: Int, name: String) => { /*validator 1: console print*/ println(name + "_" + id + "_" + System.currentTimeMillis()) /*validator 2: write file in case of executor not printing console*/ val fs = FileSystem.get(sc.hadoopConfiguration) fs.setWriteChecksum(false) val fileName = Seq(dirName, name + "_" + System.currentTimeMillis.toString) mkString separator fs.create(new Path(fileName)) /*return*/ Seq[String](name, System.currentTimeMillis().toString) } } /*test data mock*/ object Dims { private val structTypes = StructType(Seq( StructField("id", IntegerType), StructField("name", StringType) )) private val data = List( Row(100, "file100"), Row(101, "file101"), Row(102, "file102") ) private val rdd = sc.parallelize(data) val df = ss.createDataFrame(rdd, structTypes) } }
Is there any problem in the size() function?
Kind regards,
chenfh5