Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-3851

Merge Update and Insert with Partition Table is giving different results in different spark deploy modes

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.1.0
    • spark-integration
    • None

    Description

      The result sets are different when run the queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

      Steps to Reproduce Issue :

      import scala.collection.JavaConverters._
      import java.sql.Date
      import org.apache.spark.sql._
      import org.apache.spark.sql.CarbonSession._
      import org.apache.spark.sql.catalyst.TableIdentifier
      import org.apache.spark.sql.execution.command.mutation.merge.
      {CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
      
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.test.util.QueryTest
      import org.apache.spark.sql.types.
      {BooleanType, DateType, IntegerType, StringType, StructField, StructType}
      import spark.implicits._
      
      sql("drop table if exists order").show()
      sql("drop table if exists order_hist").show()
      sql("create table order_hist(id string, name string, quantity int, price int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
      
      val initframe = sc.parallelize(1 to 10, 4).map
      { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
      .toDF("id", "name", "c_name", "quantity", "price", "state")
      
      initframe.write
       .format("carbondata")
       .option("tableName", "order")
       .option("partitionColumns", "c_name")
       .mode(SaveMode.Overwrite)
       .save()
      
      val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
      val dwSelframe = dwframe.as("A")
      
      val ds1 = sc.parallelize(3 to 10, 4)
       .map {x =>
       if (x <= 4)
      { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
      else
      { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
      }.toDF("id", "name", "c_name", "quantity", "price", "state")
      
      ds1.show()
      
       val ds2 = sc.parallelize(1 to 2, 4)
       .map
      {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
      .toDS().toDF()
      
       ds2.show()
      
       val ds3 = ds1.union(ds2)
      
       ds3.show()
      
      val odsframe = ds3.as("B")
      
      var matches = Seq.empty[MergeMatch]
       val updateMap = Map(col("id") -> col("A.id"),
       col("price") -> expr("B.price + 1"),
       col("state") -> col("B.state"))
      
      val insertMap = Map(col("id") -> col("B.id"),
       col("name") -> col("B.name"),
       col("c_name") -> col("B.c_name"),
       col("quantity") -> col("B.quantity"),
       col("price") -> expr("B.price * 100"),
       col("state") -> col("B.state"))
      
      val insertMap_u = Map(col("id") -> col("id"),
       col("name") -> col("name"),
       col("c_name") -> lit("insert"),
       col("quantity") -> col("quantity"),
       col("price") -> expr("price"),
       col("state") -> col("state"))
      
      val insertMap_d = Map(col("id") -> col("id"),
       col("name") -> col("name"),
       col("c_name") -> lit("delete"),
       col("quantity") -> col("quantity"),
       col("price") -> expr("price"),
       col("state") -> col("state"))
      
      matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist"))))
       matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
       matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist"))))
      

       
      SQL Queries :

      sql("select count(*) from order").show()
       sql("select count(*) from order where state = 2").show()
       sql("select price from order where id = 'newid1'").show()
       sql("select count(*) from order_hist where c_name = 'delete'").show()
       sql("select count(*) from order_hist where c_name = 'insert'").show()
      

      Results in spark-shell --master yarn

       scala> sql("select count(*) from order").show()
       +--------+
      |count(1)|
      
      +--------+
      |10|
      
      +--------+
      
      scala> sql("select count(*) from order where state = 2").show()
       +--------+
      |count(1)|
      
      +--------+
      |0|
      
      +--------+
      
      scala> sql("select price from order where id = 'newid1'").show()
       +-----+
      |price|
      
      +-----+
       +-----+
      
      scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
       +--------+
      |count(1)|
      
      +--------+
      |0|
      
      +--------+
      
      scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
       +--------+
      |count(1)|
      
      +--------+
      |0|
      
      +--------+
      

      Results in spark-shell --master local

      scala> sql("select count(*) from order").show()
       +--------+
      |count(1)|
      
      +--------+
      |10|
      
      +--------+
      
      scala> sql("select count(*) from order where state = 2").show()
       +--------+
      |count(1)|
      
      +--------+
      |2|
      
      +--------+
      
      scala> sql("select price from order where id = 'newid1'").show()
       +-----+
      |price|
      
      +-----+
      |7500|
      
      +-----+
      
      scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
       +--------+
      |count(1)|
      
      +--------+
      |2|
      
      +--------+
      
      scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
       +--------+
      |count(1)|
      
      +--------+
      |2|
      
      +--------+
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            sachin1729 Sachin Ramachandra Setty
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: