Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4059 Pig on Spark
  3. PIG-4577

Use "cogroup" spark api to implement "groupby+secondarysort" case in GlobalRearrangeConverter.java

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      In PIG-4565(Support custom MR partitioners for Spark engine), we refine the code of GlobalRearrangeConverter(use "cogroup" spark api to implement "groupby","join" case except the "groupby+secondarysort" case)
      in PIG-4565_2.patch:
      GlobalRearrangeConverter.java

       @Override
          public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                                    POGlobalRearrangeSpark op) throws IOException {
              SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
                      op, 0);
              int parallelism = SparkUtil.getParallelism(predecessors,
                      op);
      
      //         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
      //         vs using groupBy (like we do in this commented code), vs using
      //         reduceByKey(). This is a pending task in Pig on Spark Milestone 1
      //         Once we figure that out, we can allow custom partitioning for
      //         secondary sort case as well.
      //        if (predecessors.size() == 1) {
      //            // GROUP BY
      //            JavaPairRDD<Object, Iterable<Tuple>> prdd;
      //            if (op.isUseSecondaryKey()) {
      //                prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
      //            } else {
      //                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
      //                prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
      //                prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
      //                        parallelism));
      //            }
      //            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
      //            return jrdd2.rdd();
      //
      //        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
      //            return handleSecondarySort(predecessors.get(0), op, parallelism);
      //        }
      
              if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
                  return handleSecondarySort(predecessors.get(0), op, parallelism);
              }
      
              List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
              for (RDD<Tuple> rdd : predecessors) {
                  JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
                  JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
                  rddPairs.add(rddPair.rdd());
              }
      
              // Something's wrong with the type parameters of CoGroupedRDD
              // key and value are the same type ???
              CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
                      (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
                              .asScalaBuffer(rddPairs).toSeq()),
                      SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
              );
      
              RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
                  (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
              return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
          }
      

      Actually, we can also use "cogroup" spark api to implement "secondarysort+groupby" case.

      Attachments

        1. PIG-4577.patch
          5 kB
          liyunzhang

        Issue Links

          Activity

            People

              kellyzly liyunzhang
              kellyzly liyunzhang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: