Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Now we implement secondary key sort in
GlobalRearrangeConverter#convert
first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POGlobalRearrangeSpark physicalOperator) throws IOException { .... if (predecessors.size() == 1) { // GROUP JavaPairRDD<Object, Iterable<Tuple>> prdd = null; if (physicalOperator.isUseSecondaryKey()) { RDD<Tuple> rdd = predecessors.get(0); RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(), SparkUtil.<Tuple, Object>getTuple2Manifest()); JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair, SparkUtil.getManifest(Tuple.class), SparkUtil.getManifest(Object.class)); //first sort the tuple by secondary key if enable useSecondaryKey sort JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder())); // first shuffle JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction()); prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);// second shuffle } else { JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD(); prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism); } JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator)); return jrdd2.rdd(); } .... }
we can optimize it according to the code from https://github.com/tresata/spark-sorted.
Attachments
Attachments
Issue Links
- links to