Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
2.0.0
-
None
Description
The result sets are different when run the queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)
Steps to Reproduce Issue :
import scala.collection.JavaConverters._ import java.sql.Date import org.apache.spark.sql._ import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.mutation.merge. {CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types. {BooleanType, DateType, IntegerType, StringType, StructField, StructType} import spark.implicits._ sql("drop table if exists order").show() sql("drop table if exists order_hist").show() sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show() val initframe = sc.parallelize(1 to 10, 4).map { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)} .toDF("id", "name", "c_name", "quantity", "price", "state") initframe.write .format("carbondata") .option("tableName", "order") .option("partitionColumns", "c_name") .mode(SaveMode.Overwrite) .save() val dwframe = spark.read.format("carbondata").option("tableName", "order").load() val dwSelframe = dwframe.as("A") val ds1 = sc.parallelize(3 to 10, 4) .map {x => if (x <= 4) { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) } else { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) } }.toDF("id", "name", "c_name", "quantity", "price", "state") ds1.show() val ds2 = sc.parallelize(1 to 2, 4) .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) } .toDS().toDF() ds2.show() val ds3 = ds1.union(ds2) ds3.show() val odsframe = ds3.as("B") var matches = Seq.empty[MergeMatch] val updateMap = Map(col("id") -> col("A.id"), col("price") -> expr("B.price + 1"), col("state") -> col("B.state")) val insertMap = Map(col("id") -> col("B.id"), col("name") -> col("B.name"), col("c_name") -> col("B.c_name"), col("quantity") -> col("B.quantity"), col("price") -> expr("B.price * 100"), col("state") -> col("B.state")) val insertMap_u = Map(col("id") -> col("id"), col("name") -> col("name"), col("c_name") -> lit("insert"), col("quantity") -> col("quantity"), col("price") -> expr("price"), col("state") -> col("state")) val insertMap_d = Map(col("id") -> col("id"), col("name") -> col("name"), col("c_name") -> lit("delete"), col("quantity") -> col("quantity"), col("price") -> expr("price"), col("state") -> col("state")) matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist")))) matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))
SQL Queries :
sql("select count(*) from order").show() sql("select count(*) from order where state = 2").show() sql("select price from order where id = 'newid1'").show() sql("select count(*) from order_hist where c_name = 'delete'").show() sql("select count(*) from order_hist where c_name = 'insert'").show()
Results in spark-shell --master yarn
scala> sql("select count(*) from order").show() +--------+ |count(1)| +--------+ |10| +--------+ scala> sql("select count(*) from order where state = 2").show() +--------+ |count(1)| +--------+ |0| +--------+ scala> sql("select price from order where id = 'newid1'").show() +-----+ |price| +-----+ +-----+ scala> sql("select count(*) from order_hist where c_name = 'delete'").show() +--------+ |count(1)| +--------+ |0| +--------+ scala> sql("select count(*) from order_hist where c_name = 'insert'").show() +--------+ |count(1)| +--------+ |0| +--------+
Results in spark-shell --master local
scala> sql("select count(*) from order").show() +--------+ |count(1)| +--------+ |10| +--------+ scala> sql("select count(*) from order where state = 2").show() +--------+ |count(1)| +--------+ |2| +--------+ scala> sql("select price from order where id = 'newid1'").show() +-----+ |price| +-----+ |7500| +-----+ scala> sql("select count(*) from order_hist where c_name = 'delete'").show() +--------+ |count(1)| +--------+ |2| +--------+ scala> sql("select count(*) from order_hist where c_name = 'insert'").show() +--------+ |count(1)| +--------+ |2| +--------+