diff --git common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java index c93059d..9163a50 100644 --- common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java +++ common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java @@ -65,7 +65,7 @@ public VertexType vertexType; public static enum EdgeType { - BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN + BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, XPROD_EDGE, UNKNOWN }; public String edgeType; diff --git common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java index 294dc6b..c81d655 100644 --- common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java +++ common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java @@ -34,6 +34,8 @@ public String mapEdgeType(String edgeName) { return "PARTITION_ONLY_SHUFFLE"; case "CUSTOM_EDGE": return "MULTICAST"; + case "XPROD_EDGE": + return "XPROD_EDGE"; default: return "UNKNOWN"; } diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 99c26ce..5a84ecf 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2936,6 +2936,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal 0.5f, "The maximum fraction of JVM memory which Tez will reserve for the processor"), TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION("hive.tez.task.scale.memory.reserve.fraction", -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"), + TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED("hive.tez.cartesian-product.enabled", + false, "Use Tez cartesian product edge to speed up cross product"), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true, diff --git data/conf/llap/hive-site.xml data/conf/llap/hive-site.xml index 05ab6ee..8841d8f 100644 --- data/conf/llap/hive-site.xml +++ data/conf/llap/hive-site.xml @@ -329,4 +329,9 @@ + + hive.tez.cartesian-product.enabled + true + + diff --git data/conf/tez/hive-site.xml data/conf/tez/hive-site.xml index dbff10c..0fe4c55 100644 --- data/conf/tez/hive-site.xml +++ data/conf/tez/hive-site.xml @@ -273,4 +273,9 @@ true + + hive.tez.cartesian-product.enabled + true + + diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 5ab3076..3db799e 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -132,6 +132,9 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ count.q,\ create_merge_compressed.q,\ cross_join.q,\ + cross_prod_1.q,\ + cross_prod_2.q,\ + cross_prod_3.q,\ cross_product_check_1.q,\ cross_product_check_2.q,\ ctas.q,\ @@ -487,6 +490,9 @@ minillaplocal.query.files=acid_globallimit.q,\ correlationoptimizer4.q,\ correlationoptimizer6.q,\ disable_merge_for_bucketing.q,\ + cross_prod_1.q,\ + cross_prod_2.q,\ + cross_prod_3.q,\ dynamic_partition_pruning.q,\ dynamic_semijoin_reduction.q,\ dynamic_semijoin_reduction_2.q,\ diff --git pom.xml pom.xml index e0aae27..21d134f 100644 --- pom.xml +++ pom.xml @@ -188,7 +188,7 @@ 1.7.10 4.0.4 3.0.0-SNAPSHOT - 0.8.4 + 0.9.0-SNAPSHOT 0.92.0-incubating 2.2.0 2.0.0 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6497495..58c990c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -43,6 +43,9 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; +import org.apache.tez.runtime.library.api.Partitioner; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -132,6 +135,7 @@ import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -261,7 +265,7 @@ private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork */ @SuppressWarnings("rawtypes") public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, - TezEdgeProperty edgeProp, VertexType vertexType) + TezEdgeProperty edgeProp, BaseWork work, TezWork tezWork) throws IOException { Class mergeInputClass; @@ -276,7 +280,8 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); - CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType); + CustomVertexConfiguration vertexConf + = new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work)); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); VertexManagerPluginDescriptor desc = @@ -292,6 +297,10 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, mergeInputClass = ConcatenatedMergedKeyValueInput.class; break; + case XPROD_EDGE: + mergeInputClass = ConcatenatedMergedKeyValueInput.class; + break; + case SIMPLE_EDGE: setupAutoReducerParallelism(edgeProp, w); // fall through @@ -301,7 +310,7 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, break; } - return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf), + return GroupInputEdge.create(group, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork), InputDescriptor.create(mergeInputClass.getName())); } @@ -315,13 +324,14 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, * @return */ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp, - VertexType vertexType) + BaseWork work, TezWork tezWork) throws IOException { switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType); + CustomVertexConfiguration vertexConf = + new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work)); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( @@ -332,6 +342,9 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr w.setVertexManagerPlugin(desc); break; } + case XPROD_EDGE: + break; + case SIMPLE_EDGE: { setupAutoReducerParallelism(edgeProp, w); break; @@ -345,14 +358,15 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr // nothing } - return Edge.create(v, w, createEdgeProperty(edgeProp, vConf)); + return Edge.create(v, w, createEdgeProperty(w, edgeProp, vConf, work, tezWork)); } /* * Helper function to create an edge property from an edge type. */ - private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) - throws IOException { + private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp, + Configuration conf, BaseWork work, TezWork tezWork) + throws IOException { MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); @@ -397,7 +411,22 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) .build(); return et3Conf.createDefaultEdgeProperty(); + case XPROD_EDGE: + EdgeManagerPluginDescriptor edgeManagerDescriptor = + EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()); + List crossProductSources = new ArrayList<>(); + for (BaseWork parentWork : tezWork.getParents(work)) { + if (EdgeType.XPROD_EDGE == tezWork.getEdgeType(parentWork, work)) { + crossProductSources.add(parentWork.getName()); + } + } + CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources); + edgeManagerDescriptor.setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf))); + UnorderedPartitionedKVEdgeConfig cpEdgeConf = + UnorderedPartitionedKVEdgeConfig.newBuilder(keyClass, valClass, RoundRobinPartitioner.class.getName()).build(); + return cpEdgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor); case SIMPLE_EDGE: + // fallthrough default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); @@ -412,6 +441,15 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration } } + public static class RoundRobinPartitioner implements Partitioner { + int i = 0; + + @Override + public int getPartition(Object key, Object value, int numPartitions) { + return (i++) % numPartitions; + } + } + /** * Utility method to create a stripped down configuration for the MR partitioner. * @@ -1165,6 +1203,21 @@ public Vertex createVertex(JobConf conf, BaseWork work, } else if (work instanceof MergeJoinWork) { v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx, vertexType); + // set VertexManagerPlugin if whether it's a cross product destination vertex + List crossProductSources = new ArrayList<>(); + for (BaseWork parentWork : tezWork.getParents(work)) { + if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) { + crossProductSources.add(parentWork.getName()); + } + } + + if (!crossProductSources.isEmpty()) { + CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources); + v.setVertexManagerPlugin( + VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName()) + .setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf)))); + // parallelism shouldn't be set for cartesian product vertex + } } else { // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 1c84c6a..6f1ece2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -410,7 +410,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, for (BaseWork v: children) { // finally we can create the grouped edge GroupInputEdge e = utils.createEdge(group, parentConf, - workToVertex.get(v), work.getEdgeProperty(w, v), work.getVertexType(v)); + workToVertex.get(v), work.getEdgeProperty(w, v), v, work); dag.addEdge(e); } @@ -439,8 +439,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Edge e = null; TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - - e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, work.getVertexType(v)); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work); dag.addEdge(e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java deleted file mode 100644 index 9ad33fd..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.optimizer.physical; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; -import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; -import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; -import org.apache.hadoop.hive.ql.lib.Dispatcher; -import org.apache.hadoop.hive.ql.lib.GraphWalker; -import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.lib.Rule; -import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MergeJoinWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.ReduceWork; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.ql.session.SessionState; - -/* - * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product. - * If yes, output a warning to the Session's console. - * The Checks made are the following: - * 1. MR, Shuffle Join: - * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then - * this is a cross product. - * The parent ReduceSinkOp is in the MapWork for the same Stage. - * 2. MR, MapJoin: - * If the keys expr list on the mapJoin Desc is an empty list for any input, - * this implies a cross product. - * 3. Tez, Shuffle Join: - * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then - * this is a cross product. - * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the - * reduceWork that contains the JoinOp. - * 4. Tez, Map Join: - * If the keys expr list on the mapJoin Desc is an empty list for any input, - * this implies a cross product. - */ -public class CrossProductCheck implements PhysicalPlanResolver, Dispatcher { - - protected static transient final Logger LOG = LoggerFactory - .getLogger(CrossProductCheck.class); - - @Override - public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { - TaskGraphWalker ogw = new TaskGraphWalker(this); - - ArrayList topNodes = new ArrayList(); - topNodes.addAll(pctx.getRootTasks()); - - ogw.startWalking(topNodes, null); - return pctx; - } - - @Override - public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) - throws SemanticException { - @SuppressWarnings("unchecked") - Task currTask = (Task) nd; - if (currTask instanceof MapRedTask) { - MapRedTask mrTsk = (MapRedTask)currTask; - MapredWork mrWrk = mrTsk.getWork(); - checkMapJoins(mrTsk); - checkMRReducer(currTask.toString(), mrWrk); - } else if (currTask instanceof ConditionalTask ) { - List> taskListInConditionalTask = - ((ConditionalTask) currTask).getListTasks(); - for(Task tsk: taskListInConditionalTask){ - dispatch(tsk, stack, nodeOutputs); - } - - } else if (currTask instanceof TezTask) { - TezTask tzTask = (TezTask) currTask; - TezWork tzWrk = tzTask.getWork(); - checkMapJoins(tzWrk); - checkTezReducer(tzWrk); - } - return null; - } - - private void warn(String msg) { - SessionState.getConsole().getInfoStream().println( - String.format("Warning: %s", msg)); - } - - private void checkMapJoins(MapRedTask mrTsk) throws SemanticException { - MapredWork mrWrk = mrTsk.getWork(); - MapWork mapWork = mrWrk.getMapWork(); - List warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork); - if (!warnings.isEmpty()) { - for (String w : warnings) { - warn(w); - } - } - ReduceWork redWork = mrWrk.getReduceWork(); - if (redWork != null) { - warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork); - if (!warnings.isEmpty()) { - for (String w : warnings) { - warn(w); - } - } - } - } - - private void checkMapJoins(TezWork tzWrk) throws SemanticException { - for(BaseWork wrk : tzWrk.getAllWork() ) { - - if ( wrk instanceof MergeJoinWork ) { - wrk = ((MergeJoinWork)wrk).getMainWork(); - } - - List warnings = new MapJoinCheck(wrk.getName()).analyze(wrk); - if ( !warnings.isEmpty() ) { - for(String w : warnings) { - warn(w); - } - } - } - } - - private void checkTezReducer(TezWork tzWrk) throws SemanticException { - for(BaseWork wrk : tzWrk.getAllWork() ) { - - if ( wrk instanceof MergeJoinWork ) { - wrk = ((MergeJoinWork)wrk).getMainWork(); - } - - if ( !(wrk instanceof ReduceWork ) ) { - continue; - } - ReduceWork rWork = (ReduceWork) wrk; - Operator reducer = ((ReduceWork)wrk).getReducer(); - if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) { - Map rsInfo = - new HashMap(); - for(Map.Entry e : rWork.getTagToInput().entrySet()) { - rsInfo.putAll(getReducerInfo(tzWrk, rWork.getName(), e.getValue())); - } - checkForCrossProduct(rWork.getName(), reducer, rsInfo); - } - } - } - - private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException { - ReduceWork rWrk = mrWrk.getReduceWork(); - if ( rWrk == null) { - return; - } - Operator reducer = rWrk.getReducer(); - if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) { - BaseWork prntWork = mrWrk.getMapWork(); - checkForCrossProduct(taskName, reducer, - new ExtractReduceSinkInfo(null).analyze(prntWork)); - } - } - - private void checkForCrossProduct(String taskName, - Operator reducer, - Map rsInfo) { - if ( rsInfo.isEmpty() ) { - return; - } - Iterator it = rsInfo.values().iterator(); - ExtractReduceSinkInfo.Info info = it.next(); - if (info.keyCols.size() == 0) { - List iAliases = new ArrayList(); - iAliases.addAll(info.inputAliases); - while (it.hasNext()) { - info = it.next(); - iAliases.addAll(info.inputAliases); - } - String warning = String.format( - "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product", - reducer.toString(), - iAliases, - taskName); - warn(warning); - } - } - - private Map getReducerInfo(TezWork tzWrk, String vertex, String prntVertex) - throws SemanticException { - BaseWork prntWork = tzWrk.getWorkMap().get(prntVertex); - return new ExtractReduceSinkInfo(vertex).analyze(prntWork); - } - - /* - * Given a Work descriptor and the TaskName for the work - * this is responsible to check each MapJoinOp for cross products. - * The analyze call returns the warnings list. - *

- * For MR the taskname is the StageName, for Tez it is the vertex name. - */ - public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx { - - final List warnings; - final String taskName; - - MapJoinCheck(String taskName) { - this.taskName = taskName; - warnings = new ArrayList(); - } - - List analyze(BaseWork work) throws SemanticException { - Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName() - + "%"), this); - Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); - GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList topNodes = new ArrayList(); - topNodes.addAll(work.getAllRootOperators()); - ogw.startWalking(topNodes, null); - return warnings; - } - - @Override - public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - @SuppressWarnings("unchecked") - AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) nd; - MapJoinDesc mjDesc = mjOp.getConf(); - - String bigTablAlias = mjDesc.getBigTableAlias(); - if ( bigTablAlias == null ) { - Operator parent = null; - for(Operator op : mjOp.getParentOperators() ) { - if ( op instanceof TableScanOperator ) { - parent = op; - } - } - if ( parent != null) { - TableScanDesc tDesc = ((TableScanOperator)parent).getConf(); - bigTablAlias = tDesc.getAlias(); - } - } - bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias; - - List joinExprs = mjDesc.getKeys().values().iterator().next(); - - if ( joinExprs.size() == 0 ) { - warnings.add( - String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product", - mjOp.toString(), bigTablAlias, taskName)); - } - - return null; - } - } - - /* - * for a given Work Descriptor, it extracts information about the ReduceSinkOps - * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output - * vertex. - */ - public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx { - - static class Info { - List keyCols; - List inputAliases; - - Info(List keyCols, List inputAliases) { - this.keyCols = keyCols; - this.inputAliases = inputAliases == null ? new ArrayList() : inputAliases; - } - - Info(List keyCols, String[] inputAliases) { - this.keyCols = keyCols; - this.inputAliases = inputAliases == null ? new ArrayList() : Arrays.asList(inputAliases); - } - } - - final String outputTaskName; - final Map reduceSinkInfo; - - ExtractReduceSinkInfo(String parentTaskName) { - this.outputTaskName = parentTaskName; - reduceSinkInfo = new HashMap(); - } - - Map analyze(BaseWork work) throws SemanticException { - Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() - + "%"), this); - Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); - GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList topNodes = new ArrayList(); - topNodes.addAll(work.getAllRootOperators()); - ogw.startWalking(topNodes, null); - return reduceSinkInfo; - } - - @Override - public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - ReduceSinkOperator rsOp = (ReduceSinkOperator) nd; - ReduceSinkDesc rsDesc = rsOp.getConf(); - - if ( outputTaskName != null ) { - String rOutputName = rsDesc.getOutputName(); - if ( rOutputName == null || !outputTaskName.equals(rOutputName)) { - return null; - } - } - - reduceSinkInfo.put(rsDesc.getTag(), - new Info(rsDesc.getKeyCols(), rsOp.getInputAliases())); - - return null; - } - } - - static class NoopProcessor implements NodeProcessor { - @Override - public final Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, - Object... nodeOutputs) throws SemanticException { - return nd; - } - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java new file mode 100644 index 0000000..24d0c07 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.session.SessionState; + +/* + * Check each MapJoin and ShuffleJoin Operator to see they are performing a cross product. + * If yes, output a warning to the Session's console. + * The Checks made are the following: + * 1. MR, Shuffle Join: + * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then + * this is a cross product. + * The parent ReduceSinkOp is in the MapWork for the same Stage. + * 2. MR, MapJoin: + * If the keys expr list on the mapJoin Desc is an empty list for any input, + * this implies a cross product. + * 3. Tez, Shuffle Join: + * Check the parent ReduceSinkOp of the JoinOp. If its keys list is size = 0, then + * this is a cross product. + * The parent ReduceSinkOp checked is based on the ReduceWork.tagToInput map on the + * reduceWork that contains the JoinOp. + * 4. Tez, Map Join: + * If the keys expr list on the mapJoin Desc is an empty list for any input, + * this implies a cross product. + */ +public class CrossProductHandler implements PhysicalPlanResolver, Dispatcher { + + protected static transient final Logger LOG = LoggerFactory + .getLogger(CrossProductHandler.class); + private Boolean cartesianProductEdgeEnabled = null; + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + cartesianProductEdgeEnabled = + HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED); + TaskGraphWalker ogw = new TaskGraphWalker(this); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + ogw.startWalking(topNodes, null); + return pctx; + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + @SuppressWarnings("unchecked") + Task currTask = (Task) nd; + if (currTask instanceof MapRedTask) { + MapRedTask mrTsk = (MapRedTask)currTask; + MapredWork mrWrk = mrTsk.getWork(); + checkMapJoins(mrTsk); + checkMRReducer(currTask.toString(), mrWrk); + } else if (currTask instanceof ConditionalTask ) { + List> taskListInConditionalTask = + ((ConditionalTask) currTask).getListTasks(); + for(Task tsk: taskListInConditionalTask){ + dispatch(tsk, stack, nodeOutputs); + } + + } else if (currTask instanceof TezTask) { + TezTask tezTask = (TezTask) currTask; + TezWork tezWork = tezTask.getWork(); + checkMapJoins(tezWork); + checkTezReducer(tezWork); + } + return null; + } + + private void warn(String msg) { + SessionState.getConsole().getInfoStream().println( + String.format("Warning: %s", msg)); + } + + private void checkMapJoins(MapRedTask mrTsk) throws SemanticException { + MapredWork mrWrk = mrTsk.getWork(); + MapWork mapWork = mrWrk.getMapWork(); + List warnings = new MapJoinCheck(mrTsk.toString()).analyze(mapWork); + if (!warnings.isEmpty()) { + for (String w : warnings) { + warn(w); + } + } + ReduceWork redWork = mrWrk.getReduceWork(); + if (redWork != null) { + warnings = new MapJoinCheck(mrTsk.toString()).analyze(redWork); + if (!warnings.isEmpty()) { + for (String w : warnings) { + warn(w); + } + } + } + } + + private void checkMapJoins(TezWork tezWork) throws SemanticException { + for(BaseWork wrk : tezWork.getAllWork() ) { + + if ( wrk instanceof MergeJoinWork ) { + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + + List warnings = new MapJoinCheck(wrk.getName()).analyze(wrk); + if ( !warnings.isEmpty() ) { + for(String w : warnings) { + warn(w); + } + } + } + } + + private void checkTezReducer(TezWork tezWork) throws SemanticException { + for(BaseWork wrk : tezWork.getAllWork() ) { + BaseWork origWrk = null; + + if ( wrk instanceof MergeJoinWork ) { + origWrk = wrk; + wrk = ((MergeJoinWork)wrk).getMainWork(); + } + + if ( !(wrk instanceof ReduceWork ) ) { + continue; + } + ReduceWork rWork = (ReduceWork) wrk; + Operator reducer = ((ReduceWork)wrk).getReducer(); + if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) { + boolean noOuterJoin = ((JoinDesc)reducer.getConf()).isNoOuterJoin(); + Map rsInfo = + new HashMap(); + for(Map.Entry e : rWork.getTagToInput().entrySet()) { + rsInfo.putAll(getReducerInfo(tezWork, rWork.getName(), e.getValue())); + } + if (checkForCrossProduct(rWork.getName(), reducer, rsInfo) + && cartesianProductEdgeEnabled && noOuterJoin) { + List parents = tezWork.getParents(null == origWrk ? wrk : origWrk); + for (BaseWork p: parents) { + TezEdgeProperty prop = tezWork.getEdgeProperty(p, null == origWrk ? wrk : origWrk); + LOG.info("Edge Type: "+prop.getEdgeType()); + if (prop.getEdgeType().equals(EdgeType.CUSTOM_SIMPLE_EDGE) + || prop.getEdgeType().equals(EdgeType.CUSTOM_EDGE)) { + prop.setEdgeType(EdgeType.XPROD_EDGE); + rWork.setNumReduceTasks(-1); + rWork.setMaxReduceTasks(-1); + rWork.setMinReduceTasks(-1); + } + } + } + } + } + } + + private void checkMRReducer(String taskName, MapredWork mrWrk) throws SemanticException { + ReduceWork rWrk = mrWrk.getReduceWork(); + if ( rWrk == null) { + return; + } + Operator reducer = rWrk.getReducer(); + if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) { + BaseWork parentWork = mrWrk.getMapWork(); + checkForCrossProduct(taskName, reducer, + new ExtractReduceSinkInfo(null).analyze(parentWork)); + } + } + + private boolean checkForCrossProduct(String taskName, + Operator reducer, + Map rsInfo) { + if ( rsInfo.isEmpty() ) { + return false; + } + Iterator it = rsInfo.values().iterator(); + ExtractReduceSinkInfo.Info info = it.next(); + if (info.keyCols.size() == 0) { + List iAliases = new ArrayList(); + iAliases.addAll(info.inputAliases); + while (it.hasNext()) { + info = it.next(); + iAliases.addAll(info.inputAliases); + } + String warning = String.format( + "Shuffle Join %s[tables = %s] in Stage '%s' is a cross product", + reducer.toString(), + iAliases, + taskName); + warn(warning); + return true; + } + return false; + } + + private Map getReducerInfo(TezWork tezWork, String vertex, String prntVertex) + throws SemanticException { + BaseWork parentWork = tezWork.getWorkMap().get(prntVertex); + return new ExtractReduceSinkInfo(vertex).analyze(parentWork); + } + + /* + * Given a Work descriptor and the TaskName for the work + * this is responsible to check each MapJoinOp for cross products. + * The analyze call returns the warnings list. + *

+ * For MR the taskname is the StageName, for Tez it is the vertex name. + */ + public static class MapJoinCheck implements NodeProcessor, NodeProcessorCtx { + + final List warnings; + final String taskName; + + MapJoinCheck(String taskName) { + this.taskName = taskName; + warnings = new ArrayList(); + } + + List analyze(BaseWork work) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName() + + "%"), this); + Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(work.getAllRootOperators()); + ogw.startWalking(topNodes, null); + return warnings; + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + @SuppressWarnings("unchecked") + AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) nd; + MapJoinDesc mjDesc = mjOp.getConf(); + + String bigTablAlias = mjDesc.getBigTableAlias(); + if ( bigTablAlias == null ) { + Operator parent = null; + for(Operator op : mjOp.getParentOperators() ) { + if ( op instanceof TableScanOperator ) { + parent = op; + } + } + if ( parent != null) { + TableScanDesc tDesc = ((TableScanOperator)parent).getConf(); + bigTablAlias = tDesc.getAlias(); + } + } + bigTablAlias = bigTablAlias == null ? "?" : bigTablAlias; + + List joinExprs = mjDesc.getKeys().values().iterator().next(); + + if ( joinExprs.size() == 0 ) { + warnings.add( + String.format("Map Join %s[bigTable=%s] in task '%s' is a cross product", + mjOp.toString(), bigTablAlias, taskName)); + } + + return null; + } + } + + /* + * for a given Work Descriptor, it extracts information about the ReduceSinkOps + * in the Work. For Tez, you can restrict it to ReduceSinks for a particular output + * vertex. + */ + public static class ExtractReduceSinkInfo implements NodeProcessor, NodeProcessorCtx { + + static class Info { + List keyCols; + List inputAliases; + + Info(List keyCols, List inputAliases) { + this.keyCols = keyCols; + this.inputAliases = inputAliases == null ? new ArrayList() : inputAliases; + } + + Info(List keyCols, String[] inputAliases) { + this.keyCols = keyCols; + this.inputAliases = inputAliases == null ? new ArrayList() : Arrays.asList(inputAliases); + } + } + + final String outputTaskName; + final Map reduceSinkInfo; + + ExtractReduceSinkInfo(String parentTaskName) { + this.outputTaskName = parentTaskName; + reduceSinkInfo = new HashMap(); + } + + Map analyze(BaseWork work) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + + "%"), this); + Dispatcher disp = new DefaultRuleDispatcher(new NoopProcessor(), opRules, this); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(work.getAllRootOperators()); + ogw.startWalking(topNodes, null); + return reduceSinkInfo; + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + ReduceSinkOperator rsOp = (ReduceSinkOperator) nd; + ReduceSinkDesc rsDesc = rsOp.getConf(); + + if ( outputTaskName != null ) { + String rOutputName = rsDesc.getOutputName(); + if ( rOutputName == null || !outputTaskName.equals(rOutputName)) { + return null; + } + } + + reduceSinkInfo.put(rsDesc.getTag(), + new Info(rsDesc.getKeyCols(), rsOp.getInputAliases())); + + return null; + } + } + + static class NoopProcessor implements NodeProcessor { + @Override + public final Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return nd; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 9377563..c040406 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -82,7 +82,7 @@ private void initialize(HiveConf hiveConf) { } if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) { - resolvers.add(new CrossProductCheck()); + resolvers.add(new CrossProductHandler()); } // Vectorization should be the last optimization, because it doesn't modify the plan diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java index 92d2191..52c5b64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java @@ -93,10 +93,10 @@ private void checkShuffleJoin(SparkWork sparkWork) throws SemanticException { for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) { Operator reducer = reduceWork.getReducer(); if (reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator) { - Map rsInfo = - new HashMap(); + Map rsInfo = + new HashMap(); for (BaseWork parent : sparkWork.getParents(reduceWork)) { - rsInfo.putAll(new CrossProductCheck.ExtractReduceSinkInfo(null).analyze(parent)); + rsInfo.putAll(new CrossProductHandler.ExtractReduceSinkInfo(null).analyze(parent)); } checkForCrossProduct(reduceWork.getName(), reducer, rsInfo); } @@ -107,7 +107,7 @@ private void checkMapJoin(SparkTask sparkTask) throws SemanticException { SparkWork sparkWork = sparkTask.getWork(); for (BaseWork baseWork : sparkWork.getAllWorkUnsorted()) { List warnings = - new CrossProductCheck.MapJoinCheck(sparkTask.toString()).analyze(baseWork); + new CrossProductHandler.MapJoinCheck(sparkTask.toString()).analyze(baseWork); for (String w : warnings) { warn(w); } @@ -116,12 +116,12 @@ private void checkMapJoin(SparkTask sparkTask) throws SemanticException { private void checkForCrossProduct(String workName, Operator reducer, - Map rsInfo) { + Map rsInfo) { if (rsInfo.isEmpty()) { return; } - Iterator it = rsInfo.values().iterator(); - CrossProductCheck.ExtractReduceSinkInfo.Info info = it.next(); + Iterator it = rsInfo.values().iterator(); + CrossProductHandler.ExtractReduceSinkInfo.Info info = it.next(); if (info.keyCols.size() == 0) { List iAliases = new ArrayList(); iAliases.addAll(info.inputAliases); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 08a8f00..b346017 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -289,7 +289,7 @@ public void compile(final ParseContext pCtx, final List inputs, perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); // run the optimizations that use stats for optimization - runStatsDependentOptimizations(procCtx, inputs, outputs); + runStatsDependentOptimizations(procCtx, inputs, outputs); // set reducer parallelism perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run the optimizations that use stats for optimization"); // Removing semijoin optimization when it may not be beneficial @@ -569,7 +569,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CHECK_CROSS_PRODUCT)) { - physicalCtx = new CrossProductCheck().resolve(physicalCtx); + physicalCtx = new CrossProductHandler().resolve(physicalCtx); } else { LOG.debug("Skipping cross product analysis"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index 5d7ddc8..0264d1b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -28,6 +28,7 @@ CONTAINS, CUSTOM_EDGE, CUSTOM_SIMPLE_EDGE, + XPROD_EDGE } private HiveConf hiveConf; @@ -102,4 +103,8 @@ public boolean isSlowStart() { public void setSlowStart(boolean slowStart) { this.isSlowStart = slowStart; } + + public void setEdgeType(EdgeType type) { + this.edgeType = type; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 2b52056..227835b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -115,8 +115,8 @@ public Vertex answer(InvocationOnMock invocation) throws Throwable { }); when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class), - any(TezEdgeProperty.class), any(VertexType.class))).thenAnswer(new Answer() { - + any(TezEdgeProperty.class), any(BaseWork.class), any(TezWork.class))) + .thenAnswer(new Answer() { @Override public Edge answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); diff --git ql/src/test/queries/clientpositive/cross_prod_1.q ql/src/test/queries/clientpositive/cross_prod_1.q new file mode 100644 index 0000000..89bab24 --- /dev/null +++ ql/src/test/queries/clientpositive/cross_prod_1.q @@ -0,0 +1,34 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.tez.cartesian-product.enabled=true; + +create table X as +select distinct * from src order by key limit 10; + +explain select * from X as A, X as B; +select * from X as A, X as B; + +explain select * from X as A join X as B on A.key