Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use "cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort" case)
in PIG-4565_2.patch:
GlobalRearrangeConverter.java
@Override public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POGlobalRearrangeSpark op) throws IOException { SparkUtil.assertPredecessorSizeGreaterThan(predecessors, op, 0); int parallelism = SparkUtil.getParallelism(predecessors, op); // TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input), // vs using groupBy (like we do in this commented code), vs using // reduceByKey(). This is a pending task in Pig on Spark Milestone 1 // Once we figure that out, we can allow custom partitioning for // secondary sort case as well. // if (predecessors.size() == 1) { // // GROUP BY // JavaPairRDD<Object, Iterable<Tuple>> prdd; // if (op.isUseSecondaryKey()) { // prdd = handleSecondarySort(predecessors.get(0), op, parallelism); // } else { // JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD(); // prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism); // prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(), // parallelism)); // } // JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op)); // return jrdd2.rdd(); // // if (predecessors.size() == 1 && op.isUseSecondaryKey()) { // return handleSecondarySort(predecessors.get(0), op, parallelism); // } if (predecessors.size() == 1 && op.isUseSecondaryKey()) { return handleSecondarySort(predecessors.get(0), op, parallelism); } List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>(); for (RDD<Tuple> rdd : predecessors) { JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class)); JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction()); rddPairs.add(rddPair.rdd()); } // Something's wrong with the type parameters of CoGroupedRDD // key and value are the same type ??? CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>( (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions .asScalaBuffer(rddPairs).toSeq()), SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism) ); RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd = (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD; return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd(); }
Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case.
Attachments
Attachments
Issue Links
- links to