Hive
  1. Hive
  2. HIVE-7526

Research to use groupby transformation to replace Hive existing partitionByKey and SparkCollector combination

    Details

    • Type: Task Task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: spark-branch
    • Component/s: Spark
    • Labels:
      None

      Description

      Currently SparkClient shuffles data by calling paritionByKey(). This transformation outputs <key, value> tuples. However, Hive's ExecMapper expects <key, iterator<value>> tuples, and Spark's groupByKey() seems outputing this directly. Thus, using groupByKey, we may be able to avoid its own key clustering mechanism (in HiveReduceFunction). This research is to have a try.

      1. HIVE-7526.patch
        4 kB
        Chao Sun
      2. HIVE-7526.2.patch
        3 kB
        Chao Sun
      3. HIVE-7526.3.patch
        11 kB
        Chao Sun
      4. HIVE-7526.4-spark.patch
        11 kB
        Chao Sun
      5. HIVE-7526.5-spark.patch
        13 kB
        Xuefu Zhang

        Issue Links

          Activity

          Xuefu Zhang created issue -
          Xuefu Zhang made changes -
          Field Original Value New Value
          Link This issue is part of HIVE-7292 [ HIVE-7292 ]
          Xuefu Zhang made changes -
          Link This issue is related to HIVE-7493 [ HIVE-7493 ]
          Xuefu Zhang made changes -
          Assignee Chao [ csun ]
          Hide
          Chao Sun added a comment -

          This patch changes partitionByKey() to groupByKey().
          Note two things:
          1. spark.serializer has to be set to Kyro in order to make it work;
          2. Scala doc says JavaPairRDD encapsulates a List, but in fact it's an Iterable.

          Show
          Chao Sun added a comment - This patch changes partitionByKey() to groupByKey(). Note two things: 1. spark.serializer has to be set to Kyro in order to make it work; 2. Scala doc says JavaPairRDD encapsulates a List, but in fact it's an Iterable.
          Chao Sun made changes -
          Attachment HIVE-7526.patch [ 12658278 ]
          Hide
          Xuefu Zhang added a comment -

          Chao Sun The patch looks fine, though it's outdated. Please rebase with latest branch. You might need to make additional changes.

          Show
          Xuefu Zhang added a comment - Chao Sun The patch looks fine, though it's outdated. Please rebase with latest branch. You might need to make additional changes.
          Hide
          Chao Sun added a comment -

          Updated HiveReduceFunction and ReduceTran.

          Show
          Chao Sun added a comment - Updated HiveReduceFunction and ReduceTran.
          Chao Sun made changes -
          Attachment HIVE-7526.2.patch [ 12658587 ]
          Hide
          Chao Sun added a comment -

          An attempt to fix the last patch by moving groupBy op to ShuffleTran.
          Also, since now SparkTran::transform may have input/output value types other than BytesWritable, we need to make it generic as well..

          Also added a CompTran class, which is basically a composition of transformations. It offers better type compatibility than ChainedTran.

          This is NOT the perfect solution, and may subject to further change.

          Show
          Chao Sun added a comment - An attempt to fix the last patch by moving groupBy op to ShuffleTran. Also, since now SparkTran::transform may have input/output value types other than BytesWritable, we need to make it generic as well.. Also added a CompTran class, which is basically a composition of transformations. It offers better type compatibility than ChainedTran. This is NOT the perfect solution, and may subject to further change.
          Chao Sun made changes -
          Attachment HIVE-7526.3.patch [ 12658714 ]
          Hide
          Brock Noland added a comment -

          Thank you Chao Sun! May I ask you to upload patch to https://reviews.apache.org and post link here?

          Show
          Brock Noland added a comment - Thank you Chao Sun ! May I ask you to upload patch to https://reviews.apache.org and post link here?
          Hide
          Xuefu Zhang added a comment -

          Chao, Based on our last conversation, I don't think your patch is final or ready to be reviewed. Please continue working on your patch and update when you think it's ready. Here is what I have emphasized:

          1. Define a SparkShuffle interface that's similar to existing ShuffleTran.
          2. Have two implementation of this interface: sortBy and groupBy.
          3. For sortBy, use a local key clustering mechanism.
          4. Have ReduceTran contain a reference to SparkShuffle and HiveReduceFunction instance.

          Let me know if you have additional questions.

          Show
          Xuefu Zhang added a comment - Chao, Based on our last conversation, I don't think your patch is final or ready to be reviewed. Please continue working on your patch and update when you think it's ready. Here is what I have emphasized: 1. Define a SparkShuffle interface that's similar to existing ShuffleTran. 2. Have two implementation of this interface: sortBy and groupBy. 3. For sortBy, use a local key clustering mechanism. 4. Have ReduceTran contain a reference to SparkShuffle and HiveReduceFunction instance. Let me know if you have additional questions.
          Hide
          Chao Sun added a comment -

          Hi Xuefu Zhang, I take your suggestions and proposed another patch. Please take a look. Thanks.

          Show
          Chao Sun added a comment - Hi Xuefu Zhang , I take your suggestions and proposed another patch. Please take a look. Thanks.
          Chao Sun made changes -
          Attachment HIVE-7526.4-spark.patch [ 12658841 ]
          Xuefu Zhang made changes -
          Attachment HIVE-7546.5-spark.patch [ 12658864 ]
          Xuefu Zhang made changes -
          Attachment HIVE-7526.5-spark.patch [ 12658865 ]
          Xuefu Zhang made changes -
          Attachment HIVE-7546.5-spark.patch [ 12658864 ]
          Hide
          Xuefu Zhang added a comment -

          Chao, thanks for your latest patch. I took the liberty of updating your patch due to the following:

          1. Your patch wasn't updated to the latest branch. Rebase was needed.
          2. License header missing/removing problem.
          3. More importantly, we shouldn't use a list of list to cache all rows in order to do sortBy shuffle because of unbounded memory. We should be able to back the returned iterator with the input iterator. I put code stubs for this.

          Show
          Xuefu Zhang added a comment - Chao, thanks for your latest patch. I took the liberty of updating your patch due to the following: 1. Your patch wasn't updated to the latest branch. Rebase was needed. 2. License header missing/removing problem. 3. More importantly, we shouldn't use a list of list to cache all rows in order to do sortBy shuffle because of unbounded memory. We should be able to back the returned iterator with the input iterator. I put code stubs for this.
          Xuefu Zhang made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Xuefu Zhang added a comment -

          I manually test patch #5, and the basic query works. I left the implementation regarding to sorting (commented in the code) to Rui as he is working on sorting and waiting for this.

          Patch is committed to trunk. Thanks to Chao for the patch.

          Show
          Xuefu Zhang added a comment - I manually test patch #5, and the basic query works. I left the implementation regarding to sorting (commented in the code) to Rui as he is working on sorting and waiting for this. Patch is committed to trunk. Thanks to Chao for the patch.
          Xuefu Zhang made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Fix Version/s spark-branch [ 12327352 ]
          Resolution Fixed [ 1 ]
          Hide
          Rui Li added a comment -

          Hi Xuefu Zhang do you mean you committed patch #5? I checked the latest code in github but cannot find SparkShuffler and the implementing classes.

          Show
          Rui Li added a comment - Hi Xuefu Zhang do you mean you committed patch #5? I checked the latest code in github but cannot find SparkShuffler and the implementing classes.
          Hide
          Xuefu Zhang added a comment -

          Yeah. I made the same mistake again: missing new files. I have fixed it.

          Show
          Xuefu Zhang added a comment - Yeah. I made the same mistake again: missing new files. I have fixed it.
          Hide
          Lefty Leverenz added a comment -

          Patch is committed to trunk. Thanks to Chao for the patch.

          You meant committed to Spark branch, right?

          Show
          Lefty Leverenz added a comment - Patch is committed to trunk. Thanks to Chao for the patch. You meant committed to Spark branch, right?
          Hide
          Xuefu Zhang added a comment -

          Yes. Patch committed to spark branch. Thanks for the catch, Lefty Leverenz.

          Show
          Xuefu Zhang added a comment - Yes. Patch committed to spark branch. Thanks for the catch, Lefty Leverenz .
          Hide
          Rui Li added a comment -

          Hi Xuefu Zhang Chao Sun, it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540). Should we add that to the "shuffle" method?

          Show
          Rui Li added a comment - Hi Xuefu Zhang Chao Sun , it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540 ). Should we add that to the "shuffle" method?
          Hide
          Brock Noland added a comment -

          Hi Rui Li,

          I am might be misunderstanding...but it looks like it's calculated automatically in HIVE-7567.

          Show
          Brock Noland added a comment - Hi Rui Li , I am might be misunderstanding...but it looks like it's calculated automatically in HIVE-7567 .
          Hide
          Xuefu Zhang added a comment -

          Rui Li,

          it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540). Should we add that to the "shuffle" method?

          Yes, please feel free to make the required changes. The number of partitions should come from SparkEdgeProperty, which is calculated automatically in HIVE-7567. cc Brock Noland

          Show
          Xuefu Zhang added a comment - Rui Li , it seems in SparkShuffler, we lost the # of partitions when applying the shuffle transformations. It may be useful if user can specify it (e.g. HIVE-7540 ). Should we add that to the "shuffle" method? Yes, please feel free to make the required changes. The number of partitions should come from SparkEdgeProperty, which is calculated automatically in HIVE-7567 . cc Brock Noland

            People

            • Assignee:
              Chao Sun
              Reporter:
              Xuefu Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development