diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 13067df..44b5caf 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3574,6 +3574,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.spark.dynamic.partition.pruning", false, "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" + "to a temporary HDFS file, and read later for removing unnecessary partitions."), + SPARK_DYNAMIC_RUNTIMEFILTER_PRUNING( + "hive.spark.dynamic.runtimefilter.pruning", false, + "When dynamic runtime filter pruning is enabled, joins on not non-partition keys will be processed by writing\n" + + "to a temporary HDFS file, and read later for removing unnecessary non-partition keys."), SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1ce3ba6..d08e32d 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1537,6 +1537,7 @@ spark.only.query.files=spark_union_merge.q,\ # Unlike "miniSparkOnYarn.query.files" below, these tests only run # under Spark engine and only use TestMiniSparkOnYarnCliDriver. miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ + spark_runtime_filter_pruning.q,\ spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ spark_dynamic_partition_pruning_3.q,\ diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index aaf644a..4cfca58 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -61,6 +61,7 @@ enum OperatorType { RCFILEMERGE, MERGEJOIN, SPARKPRUNINGSINK, + SPARKRUNTIMEFILTERPRUNINGSINK, } struct Operator { diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java index a002348..f8bbeaa 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java @@ -37,7 +37,8 @@ ORCFILEMERGE(22), RCFILEMERGE(23), MERGEJOIN(24), - SPARKPRUNINGSINK(25); + SPARKPRUNINGSINK(25), + SPARKRUNTIMEFILTERPRUNINGSINK(26); private final int value; @@ -110,6 +111,8 @@ public static OperatorType findByValue(int value) { return MERGEJOIN; case 25: return SPARKPRUNINGSINK; + case 26: + return SPARKRUNTIMEFILTERPRUNINGSINK; default: return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 21ca04d..969affc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -38,12 +38,15 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSparkRuntimeFilterPruningSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.parse.spark.SparkRuntimeFilterPruningSinkOperator; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AbstractVectorDesc; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; @@ -122,6 +125,7 @@ opvec.put(AppMasterEventDesc.class, AppMasterEventOperator.class); opvec.put(DynamicPruningEventDesc.class, AppMasterEventOperator.class); opvec.put(SparkPartitionPruningSinkDesc.class, SparkPartitionPruningSinkOperator.class); + opvec.put(SparkRuntimeFilterPruningSinkDesc.class, SparkRuntimeFilterPruningSinkOperator.class); opvec.put(RCFileMergeDesc.class, RCFileMergeOperator.class); opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class); opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class); @@ -133,6 +137,8 @@ vectorOpvec.put(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class); vectorOpvec.put( SparkPartitionPruningSinkDesc.class, VectorSparkPartitionPruningSinkOperator.class); + vectorOpvec.put( + SparkRuntimeFilterPruningSinkDesc.class, VectorSparkRuntimeFilterPruningSinkOperator.class); vectorOpvec.put(SelectDesc.class, VectorSelectOperator.class); vectorOpvec.put(GroupByDesc.class, VectorGroupByOperator.class); vectorOpvec.put(MapJoinDesc.class, VectorMapJoinOperator.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 5d2c759..0df904b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -371,7 +371,7 @@ public static void findRoots(Operator op, Collection> roots) { * Remove the branch that contains the specified operator. Do nothing if there's no branching, * i.e. all the upstream operators have only one child. */ - public static void removeBranch(SparkPartitionPruningSinkOperator op) { + public static void removeBranch(Operator op) { Operator child = op; Operator curr = op; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java index b194c5f..f86595b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hive.ql.exec.mr; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hive.ql.exec.DynamicValueRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -36,6 +40,7 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { private static final Logger LOG = LoggerFactory.getLogger(ObjectCache.class.getName()); + public static Map dynamicValueRegistryMap = Collections.synchronizedMap(new HashMap()); @Override public void release(String key) { @@ -47,7 +52,11 @@ public void release(String key) { @Override public T retrieve(String key) throws HiveException { - return retrieve(key, null); + if (dynamicValueRegistryMap.size() != 0){ + return (T)dynamicValueRegistryMap.get(key); + } else{ + return null; + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/DynamicValueRegistrySpark.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/DynamicValueRegistrySpark.java new file mode 100644 index 0000000..2228677 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/DynamicValueRegistrySpark.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.Map; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.DynamicValueRegistry; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.tez.DynamicValueRegistryTez; +import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.runtime.api.LogicalInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DynamicValueRegistrySpark implements DynamicValueRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DynamicValueRegistryTez.class); + + public static class RegistryConfSpark extends RegistryConf { + public Configuration conf; + public BaseWork baseWork; + public Map inputs; + protected Map result = Collections.synchronizedMap(new HashMap()); + + public RegistryConfSpark(Configuration conf, BaseWork basework, Map result) { + super(); + this.conf = conf; + this.baseWork = basework; + this.result = result; + } + } + + protected Map values = Collections.synchronizedMap(new HashMap()); + + + @Override + public Object getValue(String key) { + if (!values.containsKey(key)) { + throw new IllegalStateException("Value does not exist in registry: " + key); + } + return values.get(key); + } + + protected void setValue(String key, Object value) { + values.put(key, value); + } + + @Override + public void init(RegistryConf conf) throws Exception { + DynamicValueRegistrySpark.RegistryConfSpark rct = (DynamicValueRegistrySpark.RegistryConfSpark) conf; + + for (String inputSourceName : rct.baseWork.getInputSourceToRuntimeValuesInfo().keySet()) { + LOG.info("Runtime value source: " + inputSourceName); + RuntimeValuesInfo runtimeValuesInfo = rct.baseWork.getInputSourceToRuntimeValuesInfo().get(inputSourceName); + + // Set up col expressions for the dynamic values using this input + List ois = new ArrayList(); + for (ExprNodeDesc expr : runtimeValuesInfo.getColExprs()) { + ois.add(expr.getWritableObjectInspector()); + } + + // Setup deserializer/obj inspectors for the incoming data source + Deserializer deserializer = ReflectionUtils.newInstance(runtimeValuesInfo.getTableDesc().getDeserializerClass(), null); + deserializer.initialize(rct.conf, runtimeValuesInfo.getTableDesc().getProperties()); + ObjectInspector inspector = deserializer.getObjectInspector(); + // Set up col expressions for the dynamic values using this input + List colExprEvaluators = new ArrayList(); + for (ExprNodeDesc expr : runtimeValuesInfo.getColExprs()) { + ExprNodeEvaluator exprEval = ExprNodeEvaluatorFactory.get(expr, null); + exprEval.initialize(inspector); + colExprEvaluators.add(exprEval); + } + + Object row = rct.result.get(inputSourceName); + for (int colIdx = 0; colIdx < ois.size(); ++colIdx) { + // Read each expression and save it to the value registry + ExprNodeEvaluator eval = colExprEvaluators.get(colIdx); + Object val = eval.evaluate(row); + setValue(runtimeValuesInfo.getDynamicValueIDs().get(colIdx), val); + } + + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRuntimeFilterPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRuntimeFilterPruner.java new file mode 100644 index 0000000..fd9cbcd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRuntimeFilterPruner.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.concurrent.Callable; +import java.util.Map; +import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Set; + + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheWrapper; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.DynamicValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + + +public class SparkRuntimeFilterPruner { + private static final Logger LOG = LoggerFactory.getLogger(SparkDynamicPartitionPruner.class); + private final BytesWritable writable = new BytesWritable(); + protected Map result = Collections.synchronizedMap(new HashMap()); + protected ObjectCache dynamicValueCache; + protected List dynamicValueCacheKeys= new ArrayList(); + + public void prune(MapWork work, JobConf jobConf) throws HiveException, SerDeException { + Set sourceWorkIds = work.getRfSourceTableDescMap().keySet(); + if (sourceWorkIds.size() == 0) { + return; + } + ObjectInputStream in = null; + try { + Path baseDir = work.getTmpPathForRuntimeFilter(); + FileSystem fs = FileSystem.get(baseDir.toUri(), jobConf); + for (String id : sourceWorkIds) { + Path sourceDir = new Path(baseDir, id); + //here the fileStatus in only one path + for (FileStatus fileStatus : fs.listStatus(sourceDir)) { + // here the table only one, because the one table -> one TS-> one SparkRuntimeFilterOperator + List tables = work.getRfSourceTableDescMap().get(id); + Deserializer deserializer = ReflectionUtils.newInstance(tables.get(0).getDeserializerClass(), null); + deserializer.initialize(jobConf, tables.get(0).getProperties()); + + in = new ObjectInputStream(fs.open(fileStatus.getPath())); + + in.readUTF(); + // Read fields + while (in.available() > 0) { + writable.readFields(in); + Object row = deserializer.deserialize(writable); + result.put("Reducer " + id, row); + } + } + } + flushToRegistrySpark(jobConf, work); + } catch (Exception e) { + throw new HiveException(e); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + throw new HiveException("error while trying to close input stream", e); + } + } + } + + private void flushToRegistrySpark(JobConf jobConf, MapWork work) throws Exception { + // setup values registry + String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; + String queryId = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID); + + dynamicValueCache = new ObjectCacheWrapper( + new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(), queryId); + + final DynamicValueRegistrySpark registrySpark = dynamicValueCache.retrieve(valueRegistryKey, + new Callable() { + @Override + public DynamicValueRegistrySpark call() { + return new DynamicValueRegistrySpark(); + } + }); + + dynamicValueCacheKeys.add(valueRegistryKey); + DynamicValueRegistrySpark.RegistryConfSpark registryConf = new DynamicValueRegistrySpark.RegistryConfSpark(jobConf, work, result); + registrySpark.init(registryConf); + // here the key is refer the ObjectCacheWrapper#makeKey method + org.apache.hadoop.hive.ql.exec.mr.ObjectCache.dynamicValueRegistryMap.put(queryId + "_" + valueRegistryKey, registrySpark); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkRuntimeFilterPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkRuntimeFilterPruningSinkOperator.java new file mode 100644 index 0000000..ec6ab83 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkRuntimeFilterPruningSinkOperator.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.vector; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; +import org.apache.hadoop.hive.ql.parse.spark.SparkRuntimeFilterPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; + +/** + * Vectorized version for SparkRuntimeFilterPruningSinkOperator. + **/ +public class VectorSparkRuntimeFilterPruningSinkOperator extends SparkRuntimeFilterPruningSinkOperator{ + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + + protected transient boolean firstBatch; + + protected transient VectorExtractRow vectorExtractRow; + + protected transient Object[] singleRow; + + public VectorSparkRuntimeFilterPruningSinkOperator(CompilationOpContext ctx, + VectorizationContext context, OperatorDesc conf) { + this(ctx); + this.conf = (SparkRuntimeFilterPruningSinkDesc) conf; + this.vContext = context; + } + + /** Kryo ctor. */ + @VisibleForTesting + public VectorSparkRuntimeFilterPruningSinkOperator() { + super(); + } + + public VectorSparkRuntimeFilterPruningSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + inputObjInspectors[0] = + VectorizedBatchUtil.convertToStandardStructObjectInspector( + (StructObjectInspector) inputObjInspectors[0]); + super.initializeOp(hconf); + + firstBatch = true; + } + + @Override + public void process(Object data, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) data; + if (firstBatch) { + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], + vContext.getProjectedColumns()); + singleRow = new Object[vectorExtractRow.getCount()]; + firstBatch = false; + } + + ObjectInspector rowInspector = inputObjInspectors[0]; + try { + Writable writableRow; + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical; + vectorExtractRow.extractRow(batch, batchIndex, singleRow); + writableRow = serializer.serialize(singleRow, rowInspector); + writableRow.write(buffer); + } + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c3b846c..089510d 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkRuntimeFilterPruner; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -434,7 +435,7 @@ protected void init(JobConf job) { // Prune partitions if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") - && HiveConf.isSparkDPPAny(job)) { + && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner(); try { pruner.prune(mrwork, job); @@ -443,6 +444,16 @@ protected void init(JobConf job) { } } + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") + && HiveConf.getBoolVar(job, ConfVars.SPARK_DYNAMIC_RUNTIMEFILTER_PRUNING)) { + SparkRuntimeFilterPruner pruner = new SparkRuntimeFilterPruner(); + try { + pruner.prune(mrwork, job); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index bb7f69c..e493aab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.GenTezUtils.DynamicListContext; import org.apache.hadoop.hive.ql.parse.GenTezUtils.DynamicPartitionPrunerContext; @@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; import org.apache.hadoop.hive.ql.parse.SemiJoinHint; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; +import org.apache.hadoop.hive.ql.parse.spark.SparkRuntimeFilterPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.DynamicValue; @@ -146,10 +148,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Obje } boolean semiJoin = parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION); - if (HiveConf.getVar(parseContext.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - //TODO HIVE-16862: Implement a similar feature like "hive.tez.dynamic.semijoin.reduction" in hive on spark - semiJoin = false; - } for (DynamicListContext ctx : removerContext) { String column = ExprNodeDescUtils.extractColName(ctx.parent); @@ -214,18 +212,24 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Obje // Use the tableAlias to generate keyBaseAlias keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + colName; - Map> hints = parseContext.getSemiJoinHints(); - if (hints != null) { - // Create semijoin optimizations ONLY for hinted columns - semiJoinAttempted = processSemiJoinHints( - parseContext, ctx, hints, tableAlias, - internalColNameBuilder.toString(), colName, ts, - keyBaseAlias); + + if (HiveConf.getVar(parseContext.getConf(), + ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") ){ + semiJoinAttempted = generateSparkSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias, column); } else { - // fallback to regular logic + Map> hints = parseContext.getSemiJoinHints(); + if (hints != null) { + // Create semijoin optimizations ONLY for hinted columns + semiJoinAttempted = processSemiJoinHints( + parseContext, ctx, hints, tableAlias, + internalColNameBuilder.toString(), colName, ts, + keyBaseAlias); + } else { + // fallback to regular logic semiJoinAttempted = generateSemiJoinOperatorPlan( ctx, parseContext, ts, keyBaseAlias, internalColNameBuilder.toString(), colName, null); + } } } } @@ -465,6 +469,238 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars } } + private boolean generateSparkSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext, + TableScanOperator ts, String keyBaseAlias, String column) throws SemanticException { + + // we will put a fork in the plan at the source of the reduce sink + Operator parentOfRS = ctx.generator.getParentOperators().get(0); + + // we need the expr that generated the key of the reduce sink + ExprNodeDesc key = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); + + List keyExprs = new ArrayList(); + if (parentOfRS instanceof SelectOperator) { + // Make sure the semijoin branch is not on partition column. + String internalColName = null; + ExprNodeDesc exprNodeDesc = key; + // Find the ExprNodeColumnDesc + while (!(exprNodeDesc instanceof ExprNodeColumnDesc) && + (exprNodeDesc.getChildren() != null)) { + exprNodeDesc = exprNodeDesc.getChildren().get(0); + } + + if (exprNodeDesc instanceof ExprNodeColumnDesc) { + internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn(); + + ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc) (parentOfRS. + getColumnExprMap().get(internalColName))); + String colName = ExprNodeDescUtils.extractColName(colExpr); + + // Fetch the TableScan Operator. + Operator op = parentOfRS.getParentOperators().get(0); + while (op != null && !(op instanceof TableScanOperator)) { + op = op.getParentOperators().get(0); + } + assert op != null; + + Table table = ((TableScanOperator) op).getConf().getTableMetadata(); + if (table.isPartitionKey(colName)) { + // The column is partition column, skip the optimization. + return false; + } + } else { + // No column found! + // Bail out + return false; + } + } + keyExprs.add(key); + + // group by requires "ArrayList", don't ask. + ArrayList outputNames = new ArrayList(); + outputNames.add(HiveConf.getColumnInternalName(0)); + + // project the relevant key column + SelectDesc select = new SelectDesc(keyExprs, outputNames); + SelectOperator selectOp = + (SelectOperator) OperatorFactory.getAndMakeChild(select, + new RowSchema(parentOfRS.getSchema()), parentOfRS); + + // do a group by to aggregate min,max and bloom filter. + float groupByMemoryUsage = + HiveConf.getFloatVar(parseContext.getConf(), HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + float memoryThreshold = + HiveConf.getFloatVar(parseContext.getConf(), + HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + ArrayList groupByExprs = new ArrayList(); + + // Add min/max and bloom filter aggregations + List aggFnOIs = new ArrayList(); + aggFnOIs.add(key.getWritableObjectInspector()); + ArrayList params = new ArrayList(); + params.add( + new ExprNodeColumnDesc(key.getTypeInfo(), outputNames.get(0), + "", false)); + + ArrayList aggs = new ArrayList(); + try { + AggregationDesc min = new AggregationDesc("min", + FunctionRegistry.getGenericUDAFEvaluator("min", aggFnOIs, false, false), + params, false, Mode.PARTIAL1); + AggregationDesc max = new AggregationDesc("max", + FunctionRegistry.getGenericUDAFEvaluator("max", aggFnOIs, false, false), + params, false, Mode.PARTIAL1); + AggregationDesc bloomFilter = new AggregationDesc("bloom_filter", + FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false), + params, false, Mode.PARTIAL1); + GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); + bloomFilterEval.setSourceOperator(selectOp); + bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)); + bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval); + aggs.add(min); + aggs.add(max); + aggs.add(bloomFilter); + } catch (SemanticException e) { + LOG.error("Error creating min/max aggregations on key", e); + throw new IllegalStateException("Error creating min/max aggregations on key", e); + } + + // Create the Group by Operator + ArrayList gbOutputNames = new ArrayList(); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0)); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1)); + gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2)); + GroupByDesc groupBy = new GroupByDesc(GroupByDesc.Mode.HASH, + gbOutputNames, new ArrayList(), aggs, false, + groupByMemoryUsage, memoryThreshold, null, false, 0, false); + + ArrayList groupbyColInfos = new ArrayList(); + groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(0), key.getTypeInfo(), "", false)); + groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(1), key.getTypeInfo(), "", false)); + groupbyColInfos.add(new ColumnInfo(gbOutputNames.get(2), key.getTypeInfo(), "", false)); + + GroupByOperator groupByOp = (GroupByOperator)OperatorFactory.getAndMakeChild( + groupBy, new RowSchema(groupbyColInfos), selectOp); + + groupByOp.setColumnExprMap(new HashMap()); + + // Get the column names of the aggregations for reduce sink + int colPos = 0; + ArrayList rsValueCols = new ArrayList(); + for (int i = 0; i < aggs.size() - 1; i++) { + ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(key.getTypeInfo(), + gbOutputNames.get(colPos++), "", false); + rsValueCols.add(colExpr); + } + + // Bloom Filter uses binary + ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo, + gbOutputNames.get(colPos++), "", false); + rsValueCols.add(colExpr); + + + // Create the reduce sink operator + ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc( + new ArrayList(), rsValueCols, gbOutputNames, false, + -1, 0, 1, Operation.NOT_ACID); + ReduceSinkOperator rsOp = (ReduceSinkOperator)OperatorFactory.getAndMakeChild( + rsDesc, new RowSchema(groupByOp.getSchema()), groupByOp); + Map columnExprMap = new HashMap(); + rsOp.setColumnExprMap(columnExprMap); + + // Create the final Group By Operator + ArrayList aggsFinal = new ArrayList(); + try { + List minFinalFnOIs = new ArrayList(); + List maxFinalFnOIs = new ArrayList(); + List bloomFilterFinalFnOIs = new ArrayList(); + ArrayList minFinalParams = new ArrayList(); + ArrayList maxFinalParams = new ArrayList(); + ArrayList bloomFilterFinalParams = new ArrayList(); + // Use the expressions from Reduce Sink. + minFinalFnOIs.add(rsValueCols.get(0).getWritableObjectInspector()); + maxFinalFnOIs.add(rsValueCols.get(1).getWritableObjectInspector()); + bloomFilterFinalFnOIs.add(rsValueCols.get(2).getWritableObjectInspector()); + // Coming from a ReduceSink the aggregations would be in the form VALUE._col0, VALUE._col1 + minFinalParams.add( + new ExprNodeColumnDesc( + rsValueCols.get(0).getTypeInfo(), + Utilities.ReduceField.VALUE + "." + + gbOutputNames.get(0), "", false)); + maxFinalParams.add( + new ExprNodeColumnDesc( + rsValueCols.get(1).getTypeInfo(), + Utilities.ReduceField.VALUE + "." + + gbOutputNames.get(1), "", false)); + bloomFilterFinalParams.add( + new ExprNodeColumnDesc( + rsValueCols.get(2).getTypeInfo(), + Utilities.ReduceField.VALUE + "." + + gbOutputNames.get(2), "", false)); + + AggregationDesc min = new AggregationDesc("min", + FunctionRegistry.getGenericUDAFEvaluator("min", minFinalFnOIs, + false, false), + minFinalParams, false, Mode.FINAL); + AggregationDesc max = new AggregationDesc("max", + FunctionRegistry.getGenericUDAFEvaluator("max", maxFinalFnOIs, + false, false), + maxFinalParams, false, Mode.FINAL); + AggregationDesc bloomFilter = new AggregationDesc("bloom_filter", + FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", bloomFilterFinalFnOIs, + false, false), + bloomFilterFinalParams, false, Mode.FINAL); + GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator(); + bloomFilterEval.setSourceOperator(selectOp); + bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES)); + bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval); + + aggsFinal.add(min); + aggsFinal.add(max); + aggsFinal.add(bloomFilter); + } catch (SemanticException e) { + LOG.error("Error creating min/max aggregations on key", e); + throw new IllegalStateException("Error creating min/max aggregations on key", e); + } + + GroupByDesc groupByDescFinal = new GroupByDesc(GroupByDesc.Mode.FINAL, + gbOutputNames, new ArrayList(), aggsFinal, false, + groupByMemoryUsage, memoryThreshold, null, false, 0, false); + GroupByOperator groupByOpFinal = (GroupByOperator)OperatorFactory.getAndMakeChild( + groupByDescFinal, new RowSchema(rsOp.getSchema()), rsOp); + groupByOpFinal.setColumnExprMap(new HashMap()); + + SparkRuntimeFilterPruningSinkDesc desc = new SparkRuntimeFilterPruningSinkDesc(); + desc.setTableScan(ts); + desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(rsValueCols, "_col"))); + desc.setTargetColumnName(column); + desc.setRsValueCols(rsValueCols); + //currently not store the partKey value + desc.setPartKey(null); + SparkRuntimeFilterPruningSinkOperator rfOp = (SparkRuntimeFilterPruningSinkOperator)OperatorFactory.getAndMakeChild( + desc, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal); + + LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rfOp + ": " + ts); + parseContext.getRfOpToTsOpMap().put(rfOp, ts); + + // Save the info that is required at query time to resolve dynamic/runtime values. + RuntimeValuesInfo runtimeValuesInfo = new RuntimeValuesInfo(); + TableDesc rsFinalTableDesc = PlanUtils.getReduceValueTableDesc( + PlanUtils.getFieldSchemasFromColumnList(rsValueCols, "_col")); + List dynamicValueIDs = new ArrayList(); + dynamicValueIDs.add(keyBaseAlias + "_min"); + dynamicValueIDs.add(keyBaseAlias + "_max"); + dynamicValueIDs.add(keyBaseAlias + "_bloom_filter"); + + runtimeValuesInfo.setTableDesc(rsFinalTableDesc); + runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs); + runtimeValuesInfo.setColExprs(rsValueCols); + parseContext.getRfToRuntimeValuesInfo().put(rfOp, runtimeValuesInfo); + + return true; + } + // Generates plan for min/max when dynamic partition pruning is ruled out. private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext, TableScanOperator ts, String keyBaseAlias, String internalColName, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 99618c6..502f6b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -151,6 +151,7 @@ import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode; import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorSparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorSparkRuntimeFilterPruningSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorLimitDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc; @@ -160,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -4715,6 +4717,16 @@ private static VectorPTFInfo createVectorPTFInfo(Operator rsValueCols = new ArrayList(); + + private Path path; + + private String targetWork; + + private BaseWork targetMapWork; + + @Explain(displayName = "tmp Path", explainLevels = { Explain.Level.EXTENDED }) + public Path getPath() { + return path; + } + + public void setPath(Path path) { + this.path = path; + } + + @Explain(displayName = "target work") + public String getTargetWork() { + return this.targetWork; + } + + public void setTargetWork(String targetWork) { + this.targetWork = targetWork; + } + + public TableScanOperator getTableScan() { + return tableScan; + } + + public void setTableScan(TableScanOperator tableScan) { + this.tableScan = tableScan; + } + + @Explain(displayName = "target column name") + public String getTargetColumnName() { + return targetColumnName; + } + + public void setTargetColumnName(String targetColumnName) { + this.targetColumnName = targetColumnName; + } + + public ExprNodeDesc getPartKey() { + return partKey; + } + + public void setPartKey(ExprNodeDesc partKey) { + this.partKey = partKey; + } + + public TableDesc getTable() { + return table; + } + + public void setTable(TableDesc table) { + this.table = table; + } + + @Explain(displayName = "partition key expr") + public String getPartKeyString() { + return partKey.getExprString(); + } + + public ArrayList getRsValueCols() { + return rsValueCols; + } + + public void setRsValueCols(ArrayList rsValueCols) { + this.rsValueCols = rsValueCols; + } + + public BaseWork getTargetMapWork() { + return targetMapWork; + } + + public void setTargetMapWork(BaseWork mapWork) { + this.targetMapWork = mapWork; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 4c41920..de2d380 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; +import org.apache.hadoop.hive.ql.parse.spark.SparkRuntimeFilterPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -135,6 +137,11 @@ private Map> semiJoinHints; private boolean disableMapJoin; + private Map rfOpToTsOpMap = + new HashMap<>(); + private Map rfToRuntimeValuesInfo = + new HashMap(); + public ParseContext() { } @@ -707,4 +714,21 @@ public void setDisableMapJoin(boolean disableMapJoin) { public boolean getDisableMapJoin() { return disableMapJoin; } + + public Map getRfOpToTsOpMap() { + return rfOpToTsOpMap; + } + + public void setRfOpToTsOpMap(Map rfOpToTsOpMap) { + this.rfOpToTsOpMap = rfOpToTsOpMap; + } + + public Map getRfToRuntimeValuesInfo() { + return rfToRuntimeValuesInfo; + } + + public void setRfToRuntimeValuesInfo(Map + rfToRuntimeValuesInfo) { + this.rfToRuntimeValuesInfo = rfToRuntimeValuesInfo; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 0c1c4e0..f67928a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -632,6 +632,9 @@ public ParseContext getParseContext(ParseContext pCtx, List> clonedPruningTableScanSet; + // The set of pruning sinks + public final Set> runtimeFilterPruningSinkSet; + + // The set of TableScanOperators for pruning OP trees + public final Set> clonedRuntimeFilterPruningTableScanSet; + + //the set of RunTimeInfo for runtimeFilterPrunning + public final Map clonedPruningSinkRuntimeValuesInfo; + + @SuppressWarnings("unchecked") public GenSparkProcContext(HiveConf conf, @@ -189,5 +200,8 @@ public GenSparkProcContext(HiveConf conf, this.fileSinkMap = new LinkedHashMap>(); this.pruningSinkSet = new LinkedHashSet>(); this.clonedPruningTableScanSet = new LinkedHashSet>(); + this.runtimeFilterPruningSinkSet = new LinkedHashSet>(); + this.clonedRuntimeFilterPruningTableScanSet = new LinkedHashSet>(); + this.clonedPruningSinkRuntimeValuesInfo = new HashMap(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index ba4bb59..92da1c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.LinkedHashMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -49,11 +51,14 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -65,12 +70,30 @@ import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Strings; + /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork * Cloned from GenTezUtils. @@ -681,4 +704,157 @@ private static boolean hasGBYOperator(ReduceSinkOperator rs) { } return false; } + + public void processDynamicMinMaxPushDownOperator(GenSparkProcContext procCtx, RuntimeValuesInfo runtimeValuesInfo, + SparkRuntimeFilterPruningSinkOperator rf) throws SemanticException { + + SparkRuntimeFilterPruningSinkDesc desc = rf.getConf(); + TableScanOperator ts = desc.getTableScan(); + MapWork targetWork = (MapWork) procCtx.rootToWorkMap.get(ts); + desc.setTargetMapWork(targetWork); + + Preconditions.checkArgument( + targetWork != null, + "No targetWork found for tablescan " + ts); + String targetId = SparkUtilities.getWorkId(targetWork); + + BaseWork sourceWork = getEnclosingWork(rf, procCtx); + String sourceId = SparkUtilities.getWorkId(sourceWork); + + // set up temporary path to communicate between the small/big table + Path tmpPath = targetWork.getTmpPathForRuntimeFilter(); + if (tmpPath == null) { + Path baseTmpPath = procCtx.parseContext.getContext().getMRTmpPath(); + tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, targetId); + targetWork.setTmpPathForRuntimeFilter(tmpPath); + LOG.info("Setting tmp path between source work and target work:\n" + tmpPath); + } + + desc.setPath(new Path(tmpPath, sourceId)); + desc.setTargetWork(targetWork.getName()); + + // store table descriptor in map-targetWork + if (!targetWork.getRfSourceTableDescMap().containsKey(sourceId)) { + targetWork.getRfSourceTableDescMap().put(sourceId, new LinkedList()); + } + List tables = targetWork.getRfSourceTableDescMap().get(sourceId); + tables.add(rf.getConf().getTable()); + + BaseWork childWork = procCtx.rootToWorkMap.get(ts); + + + // Set up the dynamic values in the childWork. + RuntimeValuesInfo childRuntimeValuesInfo = + new RuntimeValuesInfo(); + childRuntimeValuesInfo.setTableDesc(runtimeValuesInfo.getTableDesc()); + childRuntimeValuesInfo.setDynamicValueIDs(runtimeValuesInfo.getDynamicValueIDs()); + childRuntimeValuesInfo.setColExprs(runtimeValuesInfo.getColExprs()); + childWork.setInputSourceToRuntimeValuesInfo( + sourceWork.getName(), childRuntimeValuesInfo); + } + + // Functionality to remove semi-join optimization + public static void removeSemiJoinOperator(ParseContext context, + SparkRuntimeFilterPruningSinkOperator rf, + TableScanOperator ts) throws SemanticException{ + // Cleanup the synthetic predicate in the tablescan operator by + // replacing it with "true" + LOG.debug("Removing SparkRuntimeFilterPruningSinkOperator " + rf + " and TableScan " + ts); + ExprNodeDesc constNode = new ExprNodeConstantDesc( + TypeInfoFactory.booleanTypeInfo, Boolean.TRUE); + DynamicValuePredicateContext filterDynamicValuePredicatesCollection = + new DynamicValuePredicateContext(); + FilterDesc filterDesc = ((FilterOperator)(ts.getChildOperators().get(0))).getConf(); + collectDynamicValuePredicates(filterDesc.getPredicate(), + filterDynamicValuePredicatesCollection); + for (ExprNodeDesc nodeToRemove : filterDynamicValuePredicatesCollection + .childParentMapping.keySet()) { + // Find out if this synthetic predicate belongs to the current cycle + boolean skip = true; + for (ExprNodeDesc expr : nodeToRemove.getChildren()) { + if (expr instanceof ExprNodeDynamicValueDesc ) { + String dynamicValueIdFromExpr = ((ExprNodeDynamicValueDesc) expr) + .getDynamicValue().getId(); + List dynamicValueIdsFromMap = context. + getRfToRuntimeValuesInfo().get(rf).getDynamicValueIDs(); + for (String dynamicValueIdFromMap : dynamicValueIdsFromMap) { + if (dynamicValueIdFromExpr.equals(dynamicValueIdFromMap)) { + // Intended predicate to be removed + skip = false; + break; + } + } + } + } + if (!skip) { + ExprNodeDesc nodeParent = filterDynamicValuePredicatesCollection + .childParentMapping.get(nodeToRemove); + if (nodeParent == null) { + // This was the only predicate, set filter expression to const + filterDesc.setPredicate(constNode); + } else { + int i = nodeParent.getChildren().indexOf(nodeToRemove); + nodeParent.getChildren().remove(i); + nodeParent.getChildren().add(i, constNode); + } + // skip the rest of the predicates + skip = true; + } + } + context.getRfOpToTsOpMap().remove(rf); + } + + private static class DynamicValuePredicateContext implements NodeProcessorCtx { + HashMap childParentMapping = new HashMap(); + } + + private static class DynamicValuePredicateProc implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + GenSparkUtils.DynamicValuePredicateContext ctx = (GenSparkUtils.DynamicValuePredicateContext) procCtx; + ExprNodeDesc parent = (ExprNodeDesc) stack.get(stack.size() - 2); + if (parent instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc parentFunc = (ExprNodeGenericFuncDesc) parent; + if (parentFunc.getGenericUDF() instanceof GenericUDFBetween || + parentFunc.getGenericUDF() instanceof GenericUDFInBloomFilter) { + ExprNodeDesc grandParent = stack.size() >= 3 ? + (ExprNodeDesc) stack.get(stack.size() - 3) : null; + ctx.childParentMapping.put(parentFunc, grandParent); + } + } + + return null; + } + } + + + private static void collectDynamicValuePredicates(ExprNodeDesc pred, NodeProcessorCtx ctx) throws SemanticException { + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. The dispatcher + // generates the plan from the operator tree + Map exprRules = new LinkedHashMap(); + exprRules.put(new RuleRegExp("R1", ExprNodeDynamicValueDesc.class.getName() + "%"), new GenSparkUtils.DynamicValuePredicateProc()); + Dispatcher disp = new DefaultRuleDispatcher(null, exprRules, ctx); + GraphWalker egw = new DefaultGraphWalker(disp); + List startNodes = new ArrayList(); + startNodes.add(pred); + + egw.startWalking(startNodes, null); + } + /** + * getEncosingWork finds the BaseWork any given operator belongs to. + */ + public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { + List> ops = new ArrayList>(); + OperatorUtils.findRoots(op, ops); + for (Operator r : ops) { + BaseWork work = procCtx.rootToWorkMap.get(r); + if (work != null) { + return work; + } + } + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 08e7f43..5c08110 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.conf.HiveConf; @@ -93,6 +94,8 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -126,13 +129,12 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, // Run Join releated optimizations runJoinOptimizations(procCtx); + // Remove cyclic dependencies for DPP and runtime filter + runCycleAnalysisForPartitionPruning(procCtx); + if(conf.isSparkDPPAny()){ // Remove DPP based on expected size of the output data runRemoveDynamicPruning(procCtx); - - // Remove cyclic dependencies for DPP - runCycleAnalysisForPartitionPruning(procCtx); - // Remove nested DPPs SparkUtilities.removeNestedDPP(procCtx); } @@ -165,8 +167,12 @@ private void runRemoveDynamicPruning(OptimizeSparkProcContext procCtx) throws Se ogw.startWalking(topNodes, null); } - private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) { + private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) throws SemanticException{ + if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING) && !conf.getBoolVar(HiveConf.ConfVars + .SPARK_DYNAMIC_RUNTIMEFILTER_PRUNING)) { + return; + } boolean cycleFree = false; while (!cycleFree) { cycleFree = true; @@ -189,27 +195,69 @@ private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCt } } - private void removeDPPOperator(Set> component, OptimizeSparkProcContext context) { - SparkPartitionPruningSinkOperator toRemove = null; + private void removeDPPOperator(Set> component, OptimizeSparkProcContext context) throws SemanticException{ + SparkPartitionPruningSinkOperator toRemovePP = null; + SparkRuntimeFilterPruningSinkOperator toRemoveRF = null; + TableScanOperator toRemoveTS = null; for (Operator o : component) { if (o instanceof SparkPartitionPruningSinkOperator) { // we want to remove the DPP with bigger data size - if (toRemove == null - || o.getConf().getStatistics().getDataSize() > toRemove.getConf().getStatistics() - .getDataSize()) { - toRemove = (SparkPartitionPruningSinkOperator) o; + //TODO whether the if judge affect the DPP implementation ??? + if (toRemovePP == null + || o.getConf().getStatistics().getDataSize() > toRemovePP.getConf().getStatistics() + .getDataSize()) { + toRemovePP = (SparkPartitionPruningSinkOperator) o; + } + } else if (o instanceof SparkRuntimeFilterPruningSinkOperator){ + TableScanOperator ts = context.getParseContext().getRfOpToTsOpMap().get(o); + + if (ts == null) { + continue; + } + + if (toRemoveRF == null + || ts.getStatistics().getDataSize() < + toRemoveTS.getStatistics().getDataSize()) { + toRemoveRF = (SparkRuntimeFilterPruningSinkOperator) o; + toRemoveTS = ts; } } + + } + Operator toRemove = toRemoveRF; + if (toRemoveRF == null && toRemovePP != null) { + toRemove = toRemovePP; + } else if (toRemovePP == null){ + // do nothing + } else { + // Cycle consists of atleast one dynamic partition pruning(DPP) + // optimization and atleast one min/max optimization. + // DPP is a better optimization unless it ends up scanning the + // bigger table for keys instead of the smaller table. + + // Get the parent TS of victimRS. + // copy the TezCompiler code + /*Operator op = victimRS; + while(!(op instanceof TableScanOperator)) { + op = op.getParentOperators().get(0); + } + if ((2 * op.getStatistics().getDataSize()) < + victimAM.getStatistics().getDataSize()) { + victim = victimAM; + }*/ } - if (toRemove == null) { + if (toRemove == null ) { return; } - OperatorUtils.removeBranch(toRemove); - // at this point we've found the fork in the op pipeline that has the pruning as a child plan. + if (toRemove == toRemoveRF) { + GenSparkUtils.removeSemiJoinOperator(context.getParseContext(), toRemoveRF, toRemoveTS); + } + + /*// at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " - + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency"); + + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency");*/ } // Tarjan's algo @@ -244,7 +292,13 @@ private void connect(Operator o, AtomicInteger index, Stack> node TableScanOperator ts = ((SparkPartitionPruningSinkDesc) o.getConf()).getTableScan(); LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); children.add(ts); - } else { + } else if (o instanceof SparkRuntimeFilterPruningSinkOperator){ + children = new ArrayList<>(); + children.addAll(o.getChildOperators()); + TableScanOperator ts = ((SparkRuntimeFilterPruningSinkDesc) o.getConf()).getTableScan(); + LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString()); + children.add(ts); + }else { children = o.getChildOperators(); } @@ -275,7 +329,7 @@ private void runStatsAnnotation(OptimizeSparkProcContext procCtx) throws Semanti private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx) throws SemanticException { - if (!conf.isSparkDPPAny()) { + if (!conf.isSparkDPPAny() && !conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_RUNTIMEFILTER_PRUNING)) { return; } @@ -356,6 +410,9 @@ protected void generateTaskTree(List> rootTasks, Pa opRules.put(new RuleRegExp("Clone OP tree for PartitionPruningSink", SparkPartitionPruningSinkOperator.getOperatorName() + "%"), new SplitOpTreeForDPP()); + opRules.put(new RuleRegExp("Clone OP tree for RuntimeFilterPruningSink", + SparkRuntimeFilterPruningSinkOperator.getOperatorName() + "%"), + new SplitOpTreeForRFP()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); @@ -389,6 +446,47 @@ protected void generateTaskTree(List> rootTasks, Pa procCtx.currentTask = mainTask; } + topNodes.clear(); + List tmpResult = new ArrayList<>(); + for (Operator ts : procCtx.clonedRuntimeFilterPruningTableScanSet){ + FilterOperator fl = (FilterOperator) ts.getChildren().get(0); + if (!isContainDynamicValue(fl.getConf())){ + topNodes.add(ts); + } else { + tmpResult.add(ts); + } + } + //remove the DynamicValue info of FIL in SparkRuntimeFilterPruningSinkOperator branch + for (Node ts: tmpResult) { + Iterator it = tempParseContext.getRfOpToTsOpMap().keySet().iterator(); + while (it.hasNext()){ + SparkRuntimeFilterPruningSinkOperator rf = (SparkRuntimeFilterPruningSinkOperator)it.next(); + TableScanOperator tso = rf.getConf().getTableScan(); + + if (((TableScanOperator) ts).getOperatorId().equals(tso.getOperatorId())){ + it.remove(); + // here ts is the TS of SparkRuntimeFilterPruningSinkOperator branch not the ts of target work + GenSparkUtils.removeSemiJoinOperator(tempParseContext, rf, (TableScanOperator) ts); + } + } + } + for (Node ts : tmpResult){ + topNodes.add(ts); + } + + if (!topNodes.isEmpty()) { + SparkTask pruningTask = SparkUtilities.createSparkTask(conf); + SparkTask mainTask = procCtx.currentTask; + pruningTask.addDependentTask(procCtx.currentTask); + procCtx.rootTasks.remove(procCtx.currentTask); + procCtx.rootTasks.add(pruningTask); + procCtx.currentTask = pruningTask; + + generateTaskTreeHelper(procCtx, topNodes); + procCtx.currentTask = mainTask; + } + + // -------------------------------- Post Pass ---------------------------------- // // we need to clone some operator plans and remove union operators still @@ -409,9 +507,23 @@ protected void generateTaskTree(List> rootTasks, Pa utils.processPartitionPruningSink(procCtx, (SparkPartitionPruningSinkOperator) prunerSink); } + // Connect any edges required for min/max pushdown + for(Operator rf : procCtx.runtimeFilterPruningSinkSet){ + utils.processDynamicMinMaxPushDownOperator(procCtx, procCtx.clonedPruningSinkRuntimeValuesInfo.get(rf), + (SparkRuntimeFilterPruningSinkOperator) rf); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); } + private boolean isContainDynamicValue(FilterDesc fd) { + String predicateString = fd.getPredicateString(); + if (predicateString.contains("DynamicValue")){ + return true; + } else { + return false; + } + } private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topNodes) throws SemanticException { // create a walker which walks the tree in a DFS manner while maintaining @@ -425,6 +537,9 @@ private void generateTaskTreeHelper(GenSparkProcContext procCtx, List topN opRules.put(new RuleRegExp("Split Work - SparkPartitionPruningSink", SparkPartitionPruningSinkOperator.getOperatorName() + "%"), genSparkWork); + opRules.put(new RuleRegExp("Split Work - SparkRuntimeFilterPruningSink", + SparkRuntimeFilterPruningSinkOperator.getOperatorName() + "%"), genSparkWork); + opRules.put(new TypeRule(MapJoinOperator.class), new SparkReduceSinkMapJoinProc()); opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuntimeFilterPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuntimeFilterPruningSinkOperator.java new file mode 100644 index 0000000..1c4a4ff --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkRuntimeFilterPruningSinkOperator.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkRuntimeFilterPruningSinkDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + + +public class SparkRuntimeFilterPruningSinkOperator extends Operator { + @SuppressWarnings("deprecation") + protected transient Serializer serializer; + protected transient DataOutputBuffer buffer; + protected static final Logger LOG = LoggerFactory.getLogger(SparkRuntimeFilterPruningSinkOperator.class); + protected Configuration hconf; + protected Writable data; + + /** Kryo ctor. */ + @VisibleForTesting + public SparkRuntimeFilterPruningSinkOperator() { + super(); + } + + public SparkRuntimeFilterPruningSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + + @Override + @SuppressWarnings("deprecation") + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + serializer = (Serializer) ReflectionUtils.newInstance( + conf.getTable().getDeserializerClass(), null); + buffer = new DataOutputBuffer(); + this.hconf = hconf; + } + + @Override + public void process(Object row, int tag) throws HiveException { + ObjectInspector rowInspector = inputObjInspectors[0]; + try { + Writable writableRow = serializer.serialize(row, rowInspector); + writableRow.write(buffer); + } catch (Exception e) { + throw new HiveException(e); + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if (!abort) { + try { + flushToFile(); + } catch (Exception e) { + throw new HiveException(e); + } + } + } + + private void flushToFile() throws IOException { + // write an intermediate file to the specified path + // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt + Path path = conf.getPath(); + FileSystem fs = path.getFileSystem(this.getConfiguration()); + fs.mkdirs(path); + + while (true) { + path = new Path(path, String.valueOf(Utilities.randGen.nextInt())); + if (!fs.exists(path)) { + break; + } + } + + short numOfRepl = fs.getDefaultReplication(path); + + ObjectOutputStream out = null; + FSDataOutputStream fsout = null; + + try { + fsout = fs.create(path, numOfRepl); + out = new ObjectOutputStream(new BufferedOutputStream(fsout, 4096)); + out.writeUTF(conf.getTargetColumnName()); + buffer.writeTo(out); + } catch (Exception e) { + try { + fs.delete(path, false); + } catch (Exception ex) { + LOG.warn("Exception happened while trying to clean partial file."); + } + throw e; + } finally { + if (out != null) { + LOG.info("Flushed to file: " + path); + out.close(); + } else if (fsout != null) { + fsout.close(); + } + } + } + + @Override + public OperatorType getType() { + return OperatorType.SPARKRUNTIMEFILTERPRUNINGSINK; + } + + @Override + public String getName() { + return SparkRuntimeFilterPruningSinkOperator.getOperatorName(); + } + + public static String getOperatorName() { + return "SPARKRUNTIMEFILTERPRUNINGSINK"; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForRFP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForRFP.java new file mode 100644 index 0000000..559011a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForRFP.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.spark; + + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.*; + +public class SplitOpTreeForRFP implements NodeProcessor{ + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + SparkRuntimeFilterPruningSinkOperator pruningSinkOperator = (SparkRuntimeFilterPruningSinkOperator)nd; + GenSparkProcContext context = (GenSparkProcContext)procCtx; + + // Locate the op where the branch starts + // This is guaranteed to succeed since the branch always follow the pattern + // as shown in the first picture above. + Operator parentSelOp = pruningSinkOperator; + Operator childSelOp = null; + while(parentSelOp != null){ + if(parentSelOp.getNumChild() > 1) { + break; + } else { + childSelOp = parentSelOp; + parentSelOp = parentSelOp.getParentOperators().get(0); + } + } + + // Check if this is a MapJoin. If so, do not split. + for (Operator childOp : parentSelOp.getChildOperators()) { + if (childOp instanceof ReduceSinkOperator && + childOp.getChildOperators().get(0) instanceof MapJoinOperator) { + //TODO whether need to add + //context.pruningSinkSet.add(pruningSinkOperator); + return null; + } + } + List> roots = new LinkedList>(); + collectRoots(roots, pruningSinkOperator); + + List> savedChildOps = parentSelOp.getChildOperators(); + parentSelOp.setChildOperators(Utilities.makeList(childSelOp)); + + //Now clone the tree above childSelOp + List> newRoots = SerializationUtilities.cloneOperatorTree(roots); + for (int i = 0; i < roots.size(); i++) { + TableScanOperator newTs = (TableScanOperator) newRoots.get(i); + TableScanOperator oldTs = (TableScanOperator) roots.get(i); + newTs.getConf().setTableMetadata(oldTs.getConf().getTableMetadata()); + } + context.clonedRuntimeFilterPruningTableScanSet.addAll(newRoots); + + // Restore broken links between operators, and remove the branch from the original tree + parentSelOp.setChildOperators(savedChildOps); + parentSelOp.removeChild(childSelOp); + + // Find the cloned PruningSink and add it to pruningSinkSet + Set> sinkSet = new HashSet>(); + for (Operator root : newRoots) { + SparkUtilities.collectOp(sinkSet, root, SparkRuntimeFilterPruningSinkOperator.class); + } + + SparkRuntimeFilterPruningSinkOperator clonedPruningSinkOp = + (SparkRuntimeFilterPruningSinkOperator) sinkSet.iterator().next(); + clonedPruningSinkOp.getConf().setTableScan(pruningSinkOperator.getConf().getTableScan()); + context.runtimeFilterPruningSinkSet.add(clonedPruningSinkOp); + context.clonedPruningSinkRuntimeValuesInfo.put(clonedPruningSinkOp, context.parseContext.getRfToRuntimeValuesInfo().get(pruningSinkOperator)); + + return null; + } + + /** + * Recursively collect all roots (e.g., table scans) that can be reached via this op. + * @param result contains all roots can be reached via op + * @param op the op to examine. + */ + private void collectRoots(List> result, Operator op) { + if (op.getNumParent() == 0) { + result.add(op); + } else { + for (Operator parentOp : op.getParentOperators()) { + collectRoots(result, parentOp); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index f2b2fc5..2470630 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -102,6 +102,8 @@ private Path tmpPathForPartitionPruning; + private Path tmpPathForRuntimeFilter; + private String inputformat; private String indexIntermediateFile; @@ -142,6 +144,10 @@ private Map> eventSourcePartKeyExprMap = new LinkedHashMap>(); + // used for runtime filter + private Map> rfSourceTableDescMap = + new LinkedHashMap<>(); + private boolean doSplitsGrouping = true; private VectorizedRowBatch vectorizedRowBatch; @@ -595,6 +601,14 @@ public void setTmpPathForPartitionPruning(Path tmpPathForPartitionPruning) { this.tmpPathForPartitionPruning = tmpPathForPartitionPruning; } + public Path getTmpPathForRuntimeFilter() { + return tmpPathForRuntimeFilter; + } + + public void setTmpPathForRuntimeFilter(Path tmpPathForRuntimeFilter) { + this.tmpPathForRuntimeFilter = tmpPathForRuntimeFilter; + } + public void mergingInto(MapWork mapWork) { // currently, this is sole field affecting mergee task mapWork.useBucketizedHiveInputFormat |= useBucketizedHiveInputFormat; @@ -663,6 +677,14 @@ public void setEventSourceColumnNameMap(Map> map) { this.eventSourceColumnNameMap = map; } + public Map> getRfSourceTableDescMap() { + return rfSourceTableDescMap; + } + + public void setRfSourceTableDescMap(Map> rfSourceTableDescMap) { + this.rfSourceTableDescMap = rfSourceTableDescMap; + } + public Map> getEventSourceColumnNameMap() { return eventSourceColumnNameMap; } @@ -884,4 +906,5 @@ public MapExplainVectorization getMapExplainVectorization() { } return new MapExplainVectorization(this); } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorSparkRuntimeFilterPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorSparkRuntimeFilterPruningSinkDesc.java new file mode 100644 index 0000000..9ed32b7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorSparkRuntimeFilterPruningSinkDesc.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +/** + * VectorSparkRuntimeFilterPruningSinkDesc. + * + * Extra parameters beyond SparkRuntimeFilterPruningSinkDesc just for the VectorSparkRuntimeFilterPruningSinkDesc. + * + * We don't extend SparkRuntimeFilterPruningSinkDesc because the base OperatorDesc doesn't support + * clone and adding it is a lot work for little gain. + */ +public class VectorSparkRuntimeFilterPruningSinkDesc extends AbstractVectorDesc { + private static long serialVersionUID = 1L; + + public VectorSparkRuntimeFilterPruningSinkDesc() { + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java index dec2d1e..7540066 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java @@ -76,6 +76,9 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } else if ((queryEngine.equals("spark") && pctx.getConf().isSparkDPPAny())) { enabled = true; + } else if ((queryEngine.equals("spark") + && pctx.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_RUNTIMEFILTER_PRUNING))){ + enabled = true; } if (!enabled) { diff --git a/ql/src/test/queries/clientpositive/spark_runtime_filter_pruning.q b/ql/src/test/queries/clientpositive/spark_runtime_filter_pruning.q new file mode 100644 index 0000000..a21bc48 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_runtime_filter_pruning.q @@ -0,0 +1,10 @@ +set hive.optimize.index.filter=true; +set hive.auto.convert.join=false; +set hive.spark.dynamic.runtimefilter.pruning=true; + +create table pokes(foo int); +create table poke1(foo1 int, fil string); +insert into table pokes values(1); +insert into table poke1 values(1, "123"); + +explain select count(*) from pokes join poke1 on (pokes.foo = poke1.foo1) where poke1.fil=123; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/spark/spark_runtime_filter_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_runtime_filter_pruning.q.out new file mode 100644 index 0000000..504d827 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_runtime_filter_pruning.q.out @@ -0,0 +1,171 @@ +PREHOOK: query: create table pokes(foo int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@pokes +POSTHOOK: query: create table pokes(foo int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@pokes +PREHOOK: query: create table poke1(foo1 int, fil string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@poke1 +POSTHOOK: query: create table poke1(foo1 int, fil string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@poke1 +PREHOOK: query: insert into table pokes values(1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@pokes +POSTHOOK: query: insert into table pokes values(1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@pokes +POSTHOOK: Lineage: pokes.foo SCRIPT [] +PREHOOK: query: insert into table poke1 values(1, "123") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@poke1 +POSTHOOK: query: insert into table poke1 values(1, "123") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@poke1 +POSTHOOK: Lineage: poke1.fil SCRIPT [] +POSTHOOK: Lineage: poke1.foo1 SCRIPT [] +PREHOOK: query: explain select count(*) from pokes join poke1 on (pokes.foo = poke1.foo1) where poke1.fil=123 +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from pokes join poke1 on (pokes.foo = poke1.foo1) where poke1.fil=123 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark + Edges: + Reducer 6 <- Map 5 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan + alias: pokes + filterExpr: foo is not null (type: boolean) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: foo is not null (type: boolean) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: foo (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=1) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: binary) + Reducer 6 + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=1) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Spark Runtime Filter Partition Pruning Sink Operator + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + target column name: foo1 + target work: Map 4 + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4) + Reducer 3 <- Reducer 2 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: pokes + filterExpr: foo is not null (type: boolean) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: foo is not null (type: boolean) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: foo (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Map 4 + Map Operator Tree: + TableScan + alias: poke1 + filterExpr: ((UDFToDouble(fil) = 123.0) and foo1 is not null and (foo1 BETWEEN DynamicValue(RS_6_pokes_foo_min) AND DynamicValue(RS_6_pokes_foo_max) and in_bloom_filter(foo1, DynamicValue(RS_6_pokes_foo_bloom_filter)))) (type: boolean) + Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((UDFToDouble(fil) = 123.0) and (foo1 BETWEEN DynamicValue(RS_6_pokes_foo_min) AND DynamicValue(RS_6_pokes_foo_max) and in_bloom_filter(foo1, DynamicValue(RS_6_pokes_foo_bloom_filter))) and foo1 is not null) (type: boolean) + Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: foo1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +