Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.1.0
-
None
-
None
Description
For query below:
select * from csv_demo limit 3;
The correct result should be:
0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3;
---------------++-
_c0 | _c1 |
---------------++-
Joe | 20 |
Tom | 30 |
Hyukjin | 25 |
---------------++-
However,when we call cache on MapPartitionsRDD below:
Then result will be error:
0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
---------------++-
_c0 | _c1 |
---------------++-
Hyukjin | 25 |
Hyukjin | 25 |
Hyukjin | 25 |
---------------++-
The reason why this happen is that:
UnsafeRow which generated by ShuffleRowRDD#compute will use the same under byte buffer
I print some log below to explain this:
Modify UnsafeRow.toString()
// This is for debugging @Override public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); } build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and baseOffset here return build.toString(); }
2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16] 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16] 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: Read value: [0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16]
we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD iterator when config is true,like below:
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) && x._2.isInstanceOf[UnsafeRow]) { (x._2).asInstanceOf[UnsafeRow].copy() } else { x._2 } }) }