Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-16923 Hive-on-Spark DPP Improvements
  3. HIVE-17225

HoS DPP pruning sink ops can target parallel work objects

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • Spark
    • None

    Description

      Setup:

      SET hive.spark.dynamic.partition.pruning=true;
      SET hive.strict.checks.cartesian.product=false;
      SET hive.auto.convert.join=true;
      
      CREATE TABLE partitioned_table1 (col int) PARTITIONED BY (part_col int);
      
      CREATE TABLE regular_table1 (col int);
      CREATE TABLE regular_table2 (col int);
      
      ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 1);
      ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 2);
      ALTER TABLE partitioned_table1 ADD PARTITION (part_col = 3);
      
      INSERT INTO table regular_table1 VALUES (1), (2), (3), (4), (5), (6);
      INSERT INTO table regular_table2 VALUES (1), (2), (3), (4), (5), (6);
      
      INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 1) VALUES (1);
      INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 2) VALUES (2);
      INSERT INTO TABLE partitioned_table1 PARTITION (part_col = 3) VALUES (3);
      
      SELECT *
      FROM   partitioned_table1,
             regular_table1 rt1,
             regular_table2 rt2
      WHERE  rt1.col = partitioned_table1.part_col
             AND rt2.col = partitioned_table1.part_col;
      

      Exception:

      2017-08-01T13:27:47,483 ERROR [b0d354a8-4cdb-4ba9-acec-27d14926aaf4 main] ql.Driver: FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.FileNotFoundException: File file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 does not exist
      	at org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:408)
      	at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:498)
      	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
      	at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
      	at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:285)
      	at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
      	at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
      	at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:285)
      	at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
      	at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
      	at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:82)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:285)
      	at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:82)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
      	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:127)
      	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1.apply(AsyncRDDActions.scala:125)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
      	at org.apache.spark.rdd.AsyncRDDActions.foreachAsync(AsyncRDDActions.scala:125)
      	at org.apache.spark.api.java.JavaRDDLike$class.foreachAsync(JavaRDDLike.scala:731)
      	at org.apache.spark.api.java.AbstractJavaRDDLike.foreachAsync(JavaRDDLike.scala:45)
      	at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:351)
      	at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358)
      	at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.FileNotFoundException: File file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 does not exist
      	at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:147)
      	at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.prune(SparkDynamicPartitionPruner.java:76)
      	at org.apache.hadoop.hive.ql.io.HiveInputFormat.init(HiveInputFormat.java:406)
      	... 62 more
      Caused by: java.io.FileNotFoundException: File file:/Users/stakiar/Documents/idea/apache-hive/itests/qtest-spark/target/tmp/scratchdir/stakiar/b0d354a8-4cdb-4ba9-acec-27d14926aaf4/hive_2017-08-01_13-27-45_553_1088589686371686526-1/-mr-10004/3/5 does not exist
      	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:431)
      	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
      	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
      	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674)
      	at org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner.processFiles(SparkDynamicPartitionPruner.java:119)
      	... 64 more
      

      The explain plan for the query is:

      STAGE DEPENDENCIES:
        Stage-2 is a root stage
        Stage-1 depends on stages: Stage-2
        Stage-0 depends on stages: Stage-1
      
      STAGE PLANS:
        Stage: Stage-2
          Spark
            DagName: stakiar_20170801202453_5376c59d-2eca-47ca-94bc-a3049f7fbd0a:39
            Vertices:
              Map 1 
                  Map Operator Tree:
                      TableScan
                        alias: partitioned_table1
                        Statistics: Num rows: 3 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                        Select Operator
                          expressions: col (type: int), part_col (type: int)
                          outputColumnNames: _col0, _col1
                          Statistics: Num rows: 3 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                          Spark HashTable Sink Operator
                            keys:
                              0 _col1 (type: int)
                              1 _col0 (type: int)
                              2 _col0 (type: int)
                  Local Work:
                    Map Reduce Local Work
              Map 3 
                  Map Operator Tree:
                      TableScan
                        alias: rt2
                        Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                        Filter Operator
                          predicate: col is not null (type: boolean)
                          Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                          Select Operator
                            expressions: col (type: int)
                            outputColumnNames: _col0
                            Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                            Spark HashTable Sink Operator
                              keys:
                                0 _col1 (type: int)
                                1 _col0 (type: int)
                                2 _col0 (type: int)
                            Select Operator
                              expressions: _col0 (type: int)
                              outputColumnNames: _col0
                              Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                              Group By Operator
                                keys: _col0 (type: int)
                                mode: hash
                                outputColumnNames: _col0
                                Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                                Spark Partition Pruning Sink Operator
                                  partition key expr: part_col
                                  Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                                  target column name: part_col
                                  target work: Map 1
                  Local Work:
                    Map Reduce Local Work
      
        Stage: Stage-1
          Spark
            DagName: stakiar_20170801202453_5376c59d-2eca-47ca-94bc-a3049f7fbd0a:38
            Vertices:
              Map 2 
                  Map Operator Tree:
                      TableScan
                        alias: rt1
                        Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                        Filter Operator
                          predicate: col is not null (type: boolean)
                          Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                          Select Operator
                            expressions: col (type: int)
                            outputColumnNames: _col0
                            Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                            Map Join Operator
                              condition map:
                                   Inner Join 0 to 1
                                   Inner Join 0 to 2
                              keys:
                                0 _col1 (type: int)
                                1 _col0 (type: int)
                                2 _col0 (type: int)
                              outputColumnNames: _col0, _col1, _col2, _col3
                              input vertices:
                                0 Map 1
                                2 Map 3
                              Statistics: Num rows: 13 Data size: 13 Basic stats: COMPLETE Column stats: NONE
                              File Output Operator
                                compressed: false
                                Statistics: Num rows: 13 Data size: 13 Basic stats: COMPLETE Column stats: NONE
                                table:
                                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                  Local Work:
                    Map Reduce Local Work
      
        Stage: Stage-0
          Fetch Operator
            limit: -1
            Processor Tree:
              ListSink
      

      Attachments

        1. HIVE-17225.6.patch
          89 kB
          Sahil Takiar
        2. HIVE-17225.5.patch
          72 kB
          Sahil Takiar
        3. HIVE-17225.4.patch
          72 kB
          Sahil Takiar
        4. HIVE-17225.3.patch
          72 kB
          Sahil Takiar
        5. HIVE-17225.2.patch
          72 kB
          Sahil Takiar
        6. HIVE17225.1.patch
          119 kB
          Janaki Lahorani

        Issue Links

          Activity

            People

              stakiar Sahil Takiar
              stakiar Sahil Takiar
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: