diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java index 71d060a295..32050ad698 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.*; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -86,8 +87,21 @@ @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + HiveConf conf = pctx.getConf(); cartesianProductEdgeEnabled = - HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED); + HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED); + // if max parallelism isn't set by user in llap mode, set it to number of executors + if (cartesianProductEdgeEnabled + && HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap") + && conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM) == null) { + LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf); + llapInfo.initClusterInfo(); + if (llapInfo.hasClusterInfo()) { + conf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM, + llapInfo.getKnownExecutorCount()); + } + } + TaskGraphWalker ogw = new TaskGraphWalker(this); ArrayList topNodes = new ArrayList();