Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23076

When we call cache() on RDD which depends on ShuffleRowRDD, we will get an error result

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.1.0
    • None
    • SQL
    • None

    Description

      For query below:

      select * from csv_demo limit 3;
      

      The correct result should be:
      0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3;
      ---------------++-

      _c0 _c1

      ---------------++-

      Joe 20
      Tom 30
      Hyukjin 25

      ---------------++-
      However,when we call cache on MapPartitionsRDD below:

       

      Then result will be error:
      0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
      ---------------++-

      _c0 _c1

      ---------------++-

      Hyukjin 25
      Hyukjin 25
      Hyukjin 25

      ---------------++-
      The reason why this happen is that:

      UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer

      I print some log below to explain this:

      Modify UnsafeRow.toString()

      // This is for debugging
      @Override
      public String toString() {
        StringBuilder build = new StringBuilder("[");
        for (int i = 0; i < sizeInBytes; i += 8) {
          if (i != 0) build.append(',');
          build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
        }
        build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here
        return build.toString();
      }
      2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16]
      2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16]
      2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16]
      

      we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below:

      reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
          if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
            && x._2.isInstanceOf[UnsafeRow]) {
            (x._2).asInstanceOf[UnsafeRow].copy()
          } else {
            x._2
          }
        })
      }
      

      Attachments

        1. shufflerowrdd-cache.png
          18 kB
          zhoukang

        Activity

          People

            Unassigned Unassigned
            cane zhoukang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: