Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-3852

CCD Merge with Partition Table is giving different results in different spark deploy modes

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.1.0
    • spark-integration
    • None

    Description

      The result sets are different when run the sql queries in spark-shell --master local and spark-shell --master yarn (Two Different Spark Deploy Modes)

      import scala.collection.JavaConverters._
      import java.sql.Date
      import org.apache.spark.sql._
      import org.apache.spark.sql.CarbonSession._
      import org.apache.spark.sql.catalyst.TableIdentifier
      import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
      
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.test.util.QueryTest
      import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
      import spark.implicits._
      
      val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", "quantity", "price", "state")
      
      
      df1.write.format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save()
      val dwframe = spark.read.format("carbondata").option("tableName", "order").load()
      val dwSelframe = dwframe.as("A")
      
      
      val ds1 = sc.parallelize(3 to 10, 4)
            .map {x =>
              if (x <= 4) {
                ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
              } else {
                ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
              }
            }.toDF("id", "name", "c_name", "quantity", "price", "state")
      	  
      
      val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
      val ds3 = ds1.union(ds2)	
      val odsframe = ds3.as("B")
      	
      sql("drop table if exists target").show()
      
      val initframe = spark.createDataFrame(Seq(
        Row("a", "0"),
        Row("b", "1"),
        Row("c", "2"),
        Row("d", "3")
      ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
      
      initframe.write
        .format("carbondata")
        .option("tableName", "target")
        .option("partitionColumns", "value")
        .mode(SaveMode.Overwrite)
        .save()
        
      val target = spark.read.format("carbondata").option("tableName", "target").load()
      
      var ccd =
        spark.createDataFrame(Seq(
          Row("a", "10", false,  0),
          Row("a", null, true, 1),   
          Row("b", null, true, 2),   
          Row("c", null, true, 3),   
          Row("c", "20", false, 4),
          Row("c", "200", false, 5),
          Row("e", "100", false, 6) 
        ).asJava,
          StructType(Seq(StructField("key", StringType),
            StructField("newValue", StringType),
            StructField("deleted", BooleanType), StructField("time", IntegerType))))
      	  
      ccd.createOrReplaceTempView("changes")
      
      ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
      
      val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]
      
      val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]]
      
      target.as("A").merge(ccd.as("B"), "A.key=B.key").
        whenMatched("B.deleted=false").
        updateExpr(updateMap).
        whenNotMatched("B.deleted=false").
        insertExpr(insertMap).
        whenMatched("B.deleted=true").
        delete().execute()
        
      

      SQL Queries to run :

       
      sql("select count(*) from target").show()
      sql("select * from target order by key").show()
      

      Results in spark-shell --master yarn

      scala> sql("select count(*) from target").show()
      +--------+
      |count(1)|
      +--------+
      |       4|
      +--------+
      
      
      scala> sql("select * from target order by key").show()
      +---+-----+
      |key|value|
      +---+-----+
      |  a|    0|
      |  b|    1|
      |  c|    2|
      |  d|    3|
      +---+-----+
      

      Results in spark-shell --master local

      scala> sql("select count(*) from target").show()
      +--------+
      |count(1)|
      +--------+
      |       3|
      +--------+
      
      
      scala> sql("select * from target order by key").show()
      +---+-----+
      |key|value|
      +---+-----+
      |  c|  200|
      |  d|    3|
      |  e|  100|
      +---+-----+
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            sachin1729 Sachin Ramachandra Setty
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 3h 20m
                3h 20m