commit e2a0e30cb89ae02642d3966fa1c419b43deb96b9 Author: Sahil Takiar Date: Sat Jul 14 12:51:01 2018 -0500 HIVE-20056: SparkPartitionPruner shouldn't be triggered by Spark tasks diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 111e614ab8..0ee41c0898 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -85,6 +85,8 @@ public static final String SPARK_OPTIMIZE_OPERATOR_TREE = "SparkOptimizeOperatorTree"; public static final String SPARK_OPTIMIZE_TASK_TREE = "SparkOptimizeTaskTree"; public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable."; + public static final String SPARK_DYNAMICALLY_PRUNE_PARTITIONS = + "SparkDynamicallyPrunePartitions."; public static final String FILE_MOVES = "FileMoves"; public static final String LOAD_TABLE = "LoadTable"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java index 240fa09454..b9285accbd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java @@ -30,9 +30,10 @@ import java.util.Map; import java.util.Set; -import com.clearspring.analytics.util.Preconditions; -import javolution.testing.AssertionException; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -63,7 +64,11 @@ * The spark version of DynamicPartitionPruner. */ public class SparkDynamicPartitionPruner { + private static final Logger LOG = LoggerFactory.getLogger(SparkDynamicPartitionPruner.class); + private static final String CLASS_NAME = SparkDynamicPartitionPruner.class.getName(); + + private final PerfLogger perfLogger = SessionState.getPerfLogger(); private final Map> sourceInfoMap = new LinkedHashMap>(); private final BytesWritable writable = new BytesWritable(); @@ -74,8 +79,12 @@ public void prune(MapWork work, JobConf jobConf) throws HiveException, SerDeExce // Nothing to prune for this MapWork return; } + perfLogger.PerfLogBegin(CLASS_NAME, + PerfLogger.SPARK_DYNAMICALLY_PRUNE_PARTITIONS + work.getName()); processFiles(work, jobConf); prunePartitions(work); + perfLogger.PerfLogBegin(CLASS_NAME, + PerfLogger.SPARK_DYNAMICALLY_PRUNE_PARTITIONS + work.getName()); } public void initialize(MapWork work, JobConf jobConf) throws SerDeException { @@ -210,14 +219,11 @@ private void applyFilterToPartitions( Path p = it.next(); PartitionDesc desc = work.getPathToPartitionInfo().get(p); Map spec = desc.getPartSpec(); - if (spec == null) { - throw new AssertionException("No partition spec found in dynamic pruning"); - } + Preconditions.checkNotNull(spec, "No partition spec found in dynamic pruning"); String partValueString = spec.get(columnName); - if (partValueString == null) { - throw new AssertionException("Could not find partition value for column: " + columnName); - } + Preconditions.checkNotNull(partValueString, + "Could not find partition value for column: " + columnName); Object partValue = converter.convert(partValueString); if (LOG.isDebugEnabled()) { @@ -234,8 +240,7 @@ private void applyFilterToPartitions( LOG.info("Pruning path: " + p); it.remove(); work.removePathToAlias(p); - // HIVE-12244 call currently ineffective - work.getPartitionDescs().remove(desc); + work.removePathToPartitionInfo(p); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index d71d705c78..001d0b0518 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -109,6 +109,9 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { try { for (BaseWork work : sparkWork.getAllWork()) { + // Run the SparkDynamicPartitionPruner, we run this here instead of inside the + // InputFormat so that we don't have to run pruning when creating a Record Reader + runDynamicPartitionPruner(work); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); SparkTran tran = generate(work, sparkWork); SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); @@ -127,6 +130,27 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { return sparkPlan; } + /** + * Run a {@link SparkDynamicPartitionPruner} on the given {@link BaseWork}. This method only + * runs the pruner if the work object is a {@link MapWork}. We do this here because we need to + * do it after all previous Spark jobs for the given query have completed, otherwise the input + * file for the pruner won't exist. We need to make sure this runs before we serialize the + * given work object to a file (so it can be read by individual tasks) because the pruner will + * mutate the work work object by removing certain input paths. + * + * @param work the {@link BaseWork} to run the pruner on + */ + private void runDynamicPartitionPruner(BaseWork work) { + if (work instanceof MapWork && HiveConf.isSparkDPPAny(jobConf)) { + SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner(); + try { + pruner.prune((MapWork) work, jobConf); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + // Generate (possibly get from a cached result) parent SparkTran private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index ec8527ef27..74d098411d 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -431,18 +431,6 @@ protected void init(JobConf job) { } else { mrwork = Utilities.getMapWork(job); } - - // Prune partitions - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") - && HiveConf.isSparkDPPAny(job)) { - SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner(); - try { - pruner.prune(mrwork, job); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } }