Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4856 Optimization for pig on spark
  3. PIG-4553

Implement secondary sort using one shuffle

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      Now we implement secondary key sort in

      GlobalRearrangeConverter#convert
      first shuffle in repartitionAndSortWithinPartitions second shuffle in groupBy

      public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                                    POGlobalRearrangeSpark physicalOperator) throws IOException {
      ....
        if (predecessors.size() == 1) {
                  // GROUP
                  JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
                  if (physicalOperator.isUseSecondaryKey()) {
                      RDD<Tuple> rdd = predecessors.get(0);
                      RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
                              SparkUtil.<Tuple, Object>getTuple2Manifest());
      
                      JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
                              SparkUtil.getManifest(Tuple.class),
                              SparkUtil.getManifest(Object.class));
      
                      //first sort the tuple by secondary key if enable useSecondaryKey sort
                      JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));  // first shuffle 
                      JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
                      prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);// second shuffle
                  } else {
                      JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
                      prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
                  }
      
                  JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
                  return jrdd2.rdd();
              } 
      ....
      }
      

      we can optimize it according to the code from https://github.com/tresata/spark-sorted.

      Attachments

        1. PIG-4553_1.patch
          13 kB
          liyunzhang
        2. PIG-4553_2.patch
          13 kB
          liyunzhang

        Issue Links

          Activity

            People

              kellyzly liyunzhang
              kellyzly liyunzhang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: