Uploaded image for project: 'CarbonData'
  1. CarbonData
  2. CARBONDATA-1076

Join Issue caused by dictionary and shuffle exchange

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.1.1-incubating, 1.1.0
    • 1.2.0
    • core
    • None
    • Carbon + spark 2.1

    Description

      We can reproduce this issue as following steps:

      Step1: create a carbon table

      carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 int) STORED by 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')")

      Step2: load data
      carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE carbon_table")

      data in file carbon_table as follows:
      col1,col2,col3
      1,2,3
      4,5,6
      7,8,9

      Step3: do the query
      carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show()

      [expected] Hive table and parquet table get same result as below and it should be correct.

      col1 col1 col3
      1 1 1
      4 4 1
      7 7 1

      [acutally] carbon will get null because wrong match.

      col1 col1 col3
      1 null null
      null 4 1
      4 null null
      null 7 1
      7 null null
      null 1 1

      Root cause analysis:

      It is because this query has two subquery, and one subquey do the decode after exchange and the other subquery do the decode before exchange, and this may lead to wrong match when execute full join.

      My idea: Can we move decode before exchange ? Because I am not very familiar with Carbon query, so any idea about this ?

      Plan as follows:

      == Physical Plan ==
      SortMergeJoin col1#3445, col1#3460, FullOuter
      :- Sort col1#3445 ASC NULLS FIRST, false, 0
      : +- Exchange hashpartitioning(col1#3445, 200)
      : +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe
      : +- HashAggregate(keys=col1#3445, col2#3446, functions=[], output=col1#3445)
      : +- Exchange hashpartitioning(col1#3445, col2#3446, 200)
      : +- HashAggregate(keys=col1#3445, col2#3446, functions=[], output=col1#3445, col2#3446)
      : +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_tablecol1#3445,col2#3446
      +- Sort col1#3460 ASC NULLS FIRST, false, 0
      +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe
      +- HashAggregate(keys=col1#3460, functions=count(col2#3461), output=col1#3460, col3#3436L)
      +- Exchange hashpartitioning(col1#3460, 200)
      +- HashAggregate(keys=col1#3460, functions=partial_count(col2#3461), output=col1#3460, count#3472L)
      +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), org.apache.spark.sql.CarbonSession@69e87cbe
      +- Scan CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name :carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] tempdev.carbon_tablecol1#3460,col2#3461]

      Attachments

        Issue Links

          Activity

            People

              ravi.pesala Ravindra Pesala
              erlu chenerlu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m