Hive
  1. Hive
  2. HIVE-7526

Research to use groupby transformation to replace Hive existing partitionByKey and SparkCollector combination

    Details

    • Type: Task Task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.1.0
    • Component/s: Spark
    • Labels:
      None

      Description

      Currently SparkClient shuffles data by calling paritionByKey(). This transformation outputs <key, value> tuples. However, Hive's ExecMapper expects <key, iterator<value>> tuples, and Spark's groupByKey() seems outputing this directly. Thus, using groupByKey, we may be able to avoid its own key clustering mechanism (in HiveReduceFunction). This research is to have a try.

      1. HIVE-7526.2.patch
        3 kB
        Chao Sun
      2. HIVE-7526.3.patch
        11 kB
        Chao Sun
      3. HIVE-7526.4-spark.patch
        11 kB
        Chao Sun
      4. HIVE-7526.5-spark.patch
        13 kB
        Xuefu Zhang
      5. HIVE-7526.patch
        4 kB
        Chao Sun

        Issue Links

          Activity

          Hide
          Xuefu Zhang added a comment -

          Rui Li,

          it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540). Should we add that to the "shuffle" method?

          Yes, please feel free to make the required changes. The number of partitions should come from SparkEdgeProperty, which is calculated automatically in HIVE-7567. cc Brock Noland

          Show
          Xuefu Zhang added a comment - Rui Li , it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540 ). Should we add that to the "shuffle" method? Yes, please feel free to make the required changes. The number of partitions should come from SparkEdgeProperty, which is calculated automatically in HIVE-7567 . cc Brock Noland
          Hide
          Brock Noland added a comment -

          Hi Rui Li,

          I am might be misunderstanding...but it looks like it's calculated automatically in HIVE-7567.

          Show
          Brock Noland added a comment - Hi Rui Li , I am might be misunderstanding...but it looks like it's calculated automatically in HIVE-7567 .
          Hide
          Rui Li added a comment -

          Hi Xuefu Zhang Chao Sun, it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540). Should we add that to the "shuffle" method?

          Show
          Rui Li added a comment - Hi Xuefu Zhang Chao Sun , it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540 ). Should we add that to the "shuffle" method?
          Hide
          Xuefu Zhang added a comment -

          Yes. Patch committed to spark branch. Thanks for the catch, Lefty Leverenz.

          Show
          Xuefu Zhang added a comment - Yes. Patch committed to spark branch. Thanks for the catch, Lefty Leverenz .
          Hide
          Lefty Leverenz added a comment -

          Patch is committed to trunk. Thanks to Chao for the patch.

          You meant committed to Spark branch, right?

          Show
          Lefty Leverenz added a comment - Patch is committed to trunk. Thanks to Chao for the patch. You meant committed to Spark branch, right?
          Hide
          Xuefu Zhang added a comment -

          Yeah. I made the same mistake again: missing new files. I have fixed it.

          Show
          Xuefu Zhang added a comment - Yeah. I made the same mistake again: missing new files. I have fixed it.
          Hide
          Rui Li added a comment -

          Hi Xuefu Zhang do you mean you committed patch #5? I checked the latest code in github but cannot find SparkShuffler and the implementing classes.

          Show
          Rui Li added a comment - Hi Xuefu Zhang do you mean you committed patch #5? I checked the latest code in github but cannot find SparkShuffler and the implementing classes.
          Hide
          Xuefu Zhang added a comment -

          I manually test patch #5, and the basic query works. I left the implementation regarding to sorting (commented in the code) to Rui as he is working on sorting and waiting for this.

          Patch is committed to trunk. Thanks to Chao for the patch.

          Show
          Xuefu Zhang added a comment - I manually test patch #5, and the basic query works. I left the implementation regarding to sorting (commented in the code) to Rui as he is working on sorting and waiting for this. Patch is committed to trunk. Thanks to Chao for the patch.
          Hide
          Xuefu Zhang added a comment -

          Chao, thanks for your latest patch. I took the liberty of updating your patch due to the following:

          1. Your patch wasn't updated to the latest branch. Rebase was needed.
          2. License header missing/removing problem.
          3. More importantly, we shouldn't use a list of list to cache all rows in order to do sortBy shuffle because of unbounded memory. We should be able to back the returned iterator with the input iterator. I put code stubs for this.

          Show
          Xuefu Zhang added a comment - Chao, thanks for your latest patch. I took the liberty of updating your patch due to the following: 1. Your patch wasn't updated to the latest branch. Rebase was needed. 2. License header missing/removing problem. 3. More importantly, we shouldn't use a list of list to cache all rows in order to do sortBy shuffle because of unbounded memory. We should be able to back the returned iterator with the input iterator. I put code stubs for this.
          Hide
          Chao Sun added a comment -

          Hi Xuefu Zhang, I take your suggestions and proposed another patch. Please take a look. Thanks.

          Show
          Chao Sun added a comment - Hi Xuefu Zhang , I take your suggestions and proposed another patch. Please take a look. Thanks.
          Hide
          Xuefu Zhang added a comment -

          Chao, Based on our last conversation, I don't think your patch is final or ready to be reviewed. Please continue working on your patch and update when you think it's ready. Here is what I have emphasized:

          1. Define a SparkShuffle interface that's similar to existing ShuffleTran.
          2. Have two implementation of this interface: sortBy and groupBy.
          3. For sortBy, use a local key clustering mechanism.
          4. Have ReduceTran contain a reference to SparkShuffle and HiveReduceFunction instance.

          Let me know if you have additional questions.

          Show
          Xuefu Zhang added a comment - Chao, Based on our last conversation, I don't think your patch is final or ready to be reviewed. Please continue working on your patch and update when you think it's ready. Here is what I have emphasized: 1. Define a SparkShuffle interface that's similar to existing ShuffleTran. 2. Have two implementation of this interface: sortBy and groupBy. 3. For sortBy, use a local key clustering mechanism. 4. Have ReduceTran contain a reference to SparkShuffle and HiveReduceFunction instance. Let me know if you have additional questions.
          Hide
          Brock Noland added a comment -

          Thank you Chao Sun! May I ask you to upload patch to https://reviews.apache.org and post link here?

          Show
          Brock Noland added a comment - Thank you Chao Sun ! May I ask you to upload patch to https://reviews.apache.org and post link here?
          Hide
          Chao Sun added a comment -

          An attempt to fix the last patch by moving groupBy op to ShuffleTran.
          Also, since now SparkTran::transform may have input/output value types other than BytesWritable, we need to make it generic as well..

          Also added a CompTran class, which is basically a composition of transformations. It offers better type compatibility than ChainedTran.

          This is NOT the perfect solution, and may subject to further change.

          Show
          Chao Sun added a comment - An attempt to fix the last patch by moving groupBy op to ShuffleTran. Also, since now SparkTran::transform may have input/output value types other than BytesWritable, we need to make it generic as well.. Also added a CompTran class, which is basically a composition of transformations. It offers better type compatibility than ChainedTran. This is NOT the perfect solution, and may subject to further change.
          Hide
          Chao Sun added a comment -

          Updated HiveReduceFunction and ReduceTran.

          Show
          Chao Sun added a comment - Updated HiveReduceFunction and ReduceTran.
          Hide
          Xuefu Zhang added a comment -

          Chao Sun The patch looks fine, though it's outdated. Please rebase with latest branch. You might need to make additional changes.

          Show
          Xuefu Zhang added a comment - Chao Sun The patch looks fine, though it's outdated. Please rebase with latest branch. You might need to make additional changes.
          Hide
          Chao Sun added a comment -

          This patch changes partitionByKey() to groupByKey().
          Note two things:
          1. spark.serializer has to be set to Kyro in order to make it work;
          2. Scala doc says JavaPairRDD encapsulates a List, but in fact it's an Iterable.

          Show
          Chao Sun added a comment - This patch changes partitionByKey() to groupByKey(). Note two things: 1. spark.serializer has to be set to Kyro in order to make it work; 2. Scala doc says JavaPairRDD encapsulates a List, but in fact it's an Iterable.

            People

            • Assignee:
              Chao Sun
              Reporter:
              Xuefu Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development