Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-8118

Support work that have multiple child works to work around SPARK-3622 [Spark Branch]

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.1.0
    • Spark

    Description

      In the current implementation, both SparkMapRecordHandler and SparkReduceRecorderHandler takes only one result collector, which limits that the corresponding map or reduce task can have only one child. It's very comment in multi-insert queries where a map/reduce task has more than one children. A query like the following has two map tasks as parents:

      select name, sum(value) from dec group by name union all select name, value from dec order by name
      

      It's possible in the future an optimation may be implemented so that a map work is followed by two reduce works and then connected to a union work.

      Thus, we should take this as a general case. Tez is currently providing a collector for each child operator in the map-side or reduce side operator tree. We can take Tez as a reference.

      Spark currently doesn't have a tranformation that supports mutliple output datasets from a single input dataset (SPARK-3622). This is a workaround for this gap.

      Likely this is a big change and subtasks are possible.

      With this, we can have a simpler and clean multi-insert implementation. This is also the problem observed in HIVE-7731 and HIVE-7503.

      Attachments

        1. HIVE-8118.pdf
          112 kB
          Xuefu Zhang

        Issue Links

          There are no Sub-Tasks for this issue.

          Activity

            chengxiang li Chengxiang Li added a comment -

            Actually, we could generate a spark graph with one map RDD followed by multi reduce RDDs, it should not related with SparkMapRecordHandler and SparkReduceRecorderHandler, we could wrap each reduce side child operator with a separate HiveReduceFunction in SparkCompiler level.
            For a map RDD which is followed by two reduce RDDs and then connected to a union RDD, Spark would compute map RDD twice unless map RDD is cached. If two reduce share the same shuffle dependency(which means they have same map output partitions), the job could be optimized to compute map RDD only once theoretically, but i think this should be an Spark framework level optimization. while two reduce RDDs don't share the same shuffle dependency, map RDD would be computed twice anyway.
            For multi-insert case, if we wrap all FileSinkOperators into one RDD, parent of FileSinkOperator would forward rows to each FileSinkOperator, so the data source for insert would be only generated once.
            so I think we do not really need multiple result collectors for SparkMapRecorderHandler and SparkReduceRecordHandler.

            chengxiang li Chengxiang Li added a comment - Actually, we could generate a spark graph with one map RDD followed by multi reduce RDDs, it should not related with SparkMapRecordHandler and SparkReduceRecorderHandler, we could wrap each reduce side child operator with a separate HiveReduceFunction in SparkCompiler level. For a map RDD which is followed by two reduce RDDs and then connected to a union RDD, Spark would compute map RDD twice unless map RDD is cached. If two reduce share the same shuffle dependency(which means they have same map output partitions), the job could be optimized to compute map RDD only once theoretically, but i think this should be an Spark framework level optimization. while two reduce RDDs don't share the same shuffle dependency, map RDD would be computed twice anyway. For multi-insert case, if we wrap all FileSinkOperators into one RDD, parent of FileSinkOperator would forward rows to each FileSinkOperator, so the data source for insert would be only generated once. so I think we do not really need multiple result collectors for SparkMapRecorderHandler and SparkReduceRecordHandler.
            xuefuz Xuefu Zhang added a comment - - edited

            Hi chengxiang li,

            Thank you for your input. I'm not sure if I understand your thought right. Let me clarify the problem by giving a SparkWork like this:

            MapWork1 -> ReduceWork1
                      \-> ReduceWork2
            

            it means that MapWork1 will generate different datasets to feed to ReduceWork1 and ReduceWork2. In case of multi-insert, ReduceWork1 and ReduceWork2 will have a FS operator. Inside MapWork1, there will be two operator branches consuming the same data, and push different data sets to two RS operators. (ReduceWork1 and ReduceWork2 have different HiveReduceFunctions.)

            However, current implemenation only takes the first data set and feed it to both reduce works. The same problem can happen also if MapWork1 were a reduce work following other ReduceWork or MapWork.

            With this problem, I'm not sure how we can get around without letting MapWork1 generate two output RDDs, one for each following reduce work. Potentially, we can duplicate MapWork1 and have the following diagram:

            MapWork11 -> ReduceWork1
            MapWork12 -> ReduceWork2
            

            where MapWork11 and MapWork12 consume the same input table (input table as RDD), and feed its first output RDD to ReduceWork1 and the second to ReduceWork2. This has its complexity, but more importantly, there will be wasted READ (unless SPark is smart enough to cache the input table, which is unlikely) and COMPUTATION (computing data twice). I feel that it's unlikely to get such optimizations from Spark framework in the near term.

            Thus, I think we have to take into consideration that a map work or a reduce work might generate multiple RDDs, one feeds to each of its children. Since SparkMapRecorderHandler and SparkReduceRecordHandler are doing the data processing on map and reduce side, they need to have a way to generate multiple outputs.

            Please correct me if I understood you wrong. Thanks.

            xuefuz Xuefu Zhang added a comment - - edited Hi chengxiang li , Thank you for your input. I'm not sure if I understand your thought right. Let me clarify the problem by giving a SparkWork like this: MapWork1 -> ReduceWork1 \-> ReduceWork2 it means that MapWork1 will generate different datasets to feed to ReduceWork1 and ReduceWork2. In case of multi-insert, ReduceWork1 and ReduceWork2 will have a FS operator. Inside MapWork1, there will be two operator branches consuming the same data, and push different data sets to two RS operators. (ReduceWork1 and ReduceWork2 have different HiveReduceFunctions.) However, current implemenation only takes the first data set and feed it to both reduce works. The same problem can happen also if MapWork1 were a reduce work following other ReduceWork or MapWork. With this problem, I'm not sure how we can get around without letting MapWork1 generate two output RDDs, one for each following reduce work. Potentially, we can duplicate MapWork1 and have the following diagram: MapWork11 -> ReduceWork1 MapWork12 -> ReduceWork2 where MapWork11 and MapWork12 consume the same input table (input table as RDD), and feed its first output RDD to ReduceWork1 and the second to ReduceWork2. This has its complexity, but more importantly, there will be wasted READ (unless SPark is smart enough to cache the input table, which is unlikely) and COMPUTATION (computing data twice). I feel that it's unlikely to get such optimizations from Spark framework in the near term. Thus, I think we have to take into consideration that a map work or a reduce work might generate multiple RDDs, one feeds to each of its children. Since SparkMapRecorderHandler and SparkReduceRecordHandler are doing the data processing on map and reduce side, they need to have a way to generate multiple outputs. Please correct me if I understood you wrong. Thanks.
            xuefuz Xuefu Zhang added a comment -

            I and chengxiang li had an offline discussion and there was just a little bit confusion on understanding the problem, and now we are in the same page. To summarize, the problem comes when a map work or reduce work is connected to multiple reduce works. Currently the a map work or reduce work is only wired with one collector, which collects all data regardless the branch. That data set feeds to all subsequent child reduce works.

            I also noted that Tez provides a <name, outputcollector> map to its recorder handlers. However, for us, we may not be able to do that, due to the limitations of Spark's RDD transformation APIs.

            xuefuz Xuefu Zhang added a comment - I and chengxiang li had an offline discussion and there was just a little bit confusion on understanding the problem, and now we are in the same page. To summarize, the problem comes when a map work or reduce work is connected to multiple reduce works. Currently the a map work or reduce work is only wired with one collector, which collects all data regardless the branch. That data set feeds to all subsequent child reduce works. I also noted that Tez provides a <name, outputcollector> map to its recorder handlers. However, for us, we may not be able to do that, due to the limitations of Spark's RDD transformation APIs.
            xuefuz Xuefu Zhang added a comment -

            Design doc is attached.

            xuefuz Xuefu Zhang added a comment - Design doc is attached.
            xuefuz Xuefu Zhang added a comment -

            All sub tasks are completed. Thus, this JIRA is closed as fixed as well.

            xuefuz Xuefu Zhang added a comment - All sub tasks are completed. Thus, this JIRA is closed as fixed as well.

            People

              csun Chao Sun
              xuefuz Xuefu Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: