Index: pom.xml
===================================================================
--- pom.xml (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ pom.xml (working copy)
@@ -100,7 +100,7 @@
3.4
1.7.5
0.8.0.RELEASE
- 1.1.0-incubating
+ 1.2.0-incubating-SNAPSHOT
3.2.6
3.2.10
3.2.9
Property changes on: hbase-handler/pom.xml
___________________________________________________________________
Modified: svn:mergeinfo
Reverse-merged /hive/branches/cbo/hbase-handler/pom.xml:r1605012-1627125
Merged /hive/trunk/hbase-handler/pom.xml:r1605012-1660746
Index: metastore/bin/.gitignore
===================================================================
--- metastore/bin/.gitignore (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ metastore/bin/.gitignore (working copy)
@@ -1 +1 @@
-# Dummy file to make Git recognize this empty directory
+/src/
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -702,6 +702,9 @@
// CBO related
HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."),
+ HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"),
+ EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on"
+ + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."),
// hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
// need to remove by hive .13. Also, do not change default (see SMB operator)
Index: ql/src/test/queries/clientpositive/cbo_join.q
===================================================================
--- ql/src/test/queries/clientpositive/cbo_join.q (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/test/queries/clientpositive/cbo_join.q (working copy)
@@ -4,6 +4,7 @@
set hive.stats.fetch.column.stats=true;
set hive.auto.convert.join=false;
+-- SORT_QUERY_RESULTS
-- 4. Test Select + Join + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key;
select cbo_t1.key from cbo_t1 join cbo_t3;
Index: ql/src/test/results/clientpositive/cbo_join.q.out
===================================================================
--- ql/src/test/results/clientpositive/cbo_join.q.out (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/test/results/clientpositive/cbo_join.q.out (working copy)
@@ -1,4 +1,5 @@
-PREHOOK: query: -- 4. Test Select + Join + TS
+PREHOOK: query: -- SORT_QUERY_RESULTS
+-- 4. Test Select + Join + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -6,7 +7,8 @@
PREHOOK: Input: default@cbo_t2
PREHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-POSTHOOK: query: -- 4. Test Select + Join + TS
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+-- 4. Test Select + Join + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@cbo_t1
@@ -122,46 +124,6 @@
POSTHOOK: Input: default@cbo_t1@dt=2014
POSTHOOK: Input: default@cbo_t3
#### A masked pattern was here ####
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
1
1
1
@@ -522,6 +484,46 @@
1
1
1
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
PREHOOK: query: select cbo_t1.key from cbo_t1 join cbo_t3 where cbo_t1.key=cbo_t3.key and cbo_t1.key >= 1
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -632,8 +634,6 @@
POSTHOOK: Input: default@cbo_t2
POSTHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-NULL NULL
-NULL NULL
1 1
1 1
1 1
@@ -730,6 +730,8 @@
1 1
1 1
1 1
+NULL NULL
+NULL NULL
PREHOOK: query: select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 right outer join cbo_t2 on cbo_t1.key=cbo_t2.key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -744,8 +746,6 @@
POSTHOOK: Input: default@cbo_t2
POSTHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-NULL NULL
-NULL NULL
1 1
1 1
1 1
@@ -847,6 +847,8 @@
NULL 2
NULL 2
NULL 2
+NULL NULL
+NULL NULL
PREHOOK: query: select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 full outer join cbo_t2 on cbo_t1.key=cbo_t2.key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -861,10 +863,6 @@
POSTHOOK: Input: default@cbo_t2
POSTHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-NULL NULL
-NULL NULL
-NULL NULL
-NULL NULL
1 1
1 1
1 1
@@ -966,6 +964,10 @@
NULL 2
NULL 2
NULL 2
+NULL NULL
+NULL NULL
+NULL NULL
+NULL NULL
PREHOOK: query: select b, cbo_t1.c, cbo_t2.p, q, cbo_t3.c_int from (select key as a, c_int as b, cbo_t1.c_float as c from cbo_t1) cbo_t1 join (select cbo_t2.key as p, cbo_t2.c_int as q, c_float as r from cbo_t2) cbo_t2 on cbo_t1.a=p join cbo_t3 on cbo_t1.a=key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -5334,8 +5336,6 @@
POSTHOOK: Input: default@cbo_t2@dt=2014
POSTHOOK: Input: default@cbo_t3
#### A masked pattern was here ####
-NULL NULL NULL NULL
-NULL NULL NULL NULL
1 1 1 1
1 1 1 1
1 1 1 1
@@ -5870,6 +5870,8 @@
NULL NULL NULL NULL
NULL NULL NULL NULL
NULL NULL NULL NULL
+NULL NULL NULL NULL
+NULL NULL NULL NULL
PREHOOK: query: select b, cbo_t1.c, cbo_t2.p, q, cbo_t3.c_int from (select key as a, c_int as b, cbo_t1.c_float as c from cbo_t1) cbo_t1 full outer join (select cbo_t2.key as p, cbo_t2.c_int as q, c_float as r from cbo_t2) cbo_t2 on cbo_t1.a=p join cbo_t3 on cbo_t1.a=key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -6430,8 +6432,6 @@
POSTHOOK: Input: default@cbo_t2@dt=2014
POSTHOOK: Input: default@cbo_t3
#### A masked pattern was here ####
-NULL NULL NULL NULL
-NULL NULL NULL NULL
1 1 1 1
1 1 1 1
1 1 1 1
@@ -6966,6 +6966,8 @@
NULL NULL NULL NULL
NULL NULL NULL NULL
NULL NULL NULL NULL
+NULL NULL NULL NULL
+NULL NULL NULL NULL
PREHOOK: query: -- 5. Test Select + Join + FIL + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key where (cbo_t1.c_int + cbo_t2.c_int == 2) and (cbo_t1.c_int > 0 or cbo_t2.c_float >= 0)
PREHOOK: type: QUERY
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java (working copy)
@@ -38,7 +38,7 @@
* SemanticAnalyzer.saveViewDefinition() calls TokenRewriteStream.toString().
*
*/
-class UnparseTranslator {
+public class UnparseTranslator {
// key is token start index
private final NavigableMap translations;
private final List copyTranslations;
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -229,9 +229,9 @@
private HashMap opToPartPruner;
private HashMap opToPartList;
- private HashMap> topOps;
- private final HashMap> topSelOps;
- private final LinkedHashMap, OpParseContext> opParseCtx;
+ protected HashMap> topOps;
+ private HashMap> topSelOps;
+ protected LinkedHashMap, OpParseContext> opParseCtx;
private List loadTableWork;
private List loadFileWork;
private final Map joinContext;
@@ -258,7 +258,7 @@
private CreateViewDesc createVwDesc;
private ArrayList viewsExpanded;
private ASTNode viewSelect;
- private final UnparseTranslator unparseTranslator;
+ protected final UnparseTranslator unparseTranslator;
private final GlobalLimitCtx globalLimitCtx;
// prefix for column names auto generated by hive
@@ -478,7 +478,7 @@
wExprsInDest.containsKey(wFnSpec.getExpression().toStringTree())) {
continue;
}
- wFnSpec.setAlias("_wcol" + wColIdx);
+ wFnSpec.setAlias(wFnSpec.getName() + "_window_" + wColIdx);
spec.addWindowFunction(wFnSpec);
qb.getParseInfo().addWindowingExprToClause(dest, wFnSpec.getExpression());
}
@@ -3448,7 +3448,7 @@
return ret;
}
- private int setBit(int bitmap, int bitIdx) {
+ public static int setBit(int bitmap, int bitIdx) {
return bitmap | (1 << bitIdx);
}
@@ -3984,10 +3984,10 @@
/**
* Class to store GenericUDAF related information.
*/
- static class GenericUDAFInfo {
- ArrayList convertedParameters;
- GenericUDAFEvaluator genericUDAFEvaluator;
- TypeInfo returnType;
+ public static class GenericUDAFInfo {
+ public ArrayList convertedParameters;
+ public GenericUDAFEvaluator genericUDAFEvaluator;
+ public TypeInfo returnType;
}
/**
@@ -4028,7 +4028,7 @@
* Returns the GenericUDAFEvaluator for the aggregation. This is called once
* for each GroupBy aggregation.
*/
- static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName,
+ public static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName,
ArrayList aggParameters, ASTNode aggTree,
boolean isDistinct, boolean isAllColumns)
throws SemanticException {
@@ -4058,7 +4058,7 @@
* @throws SemanticException
* when the UDAF is not found or has problems.
*/
- static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator,
+ public static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator,
GenericUDAFEvaluator.Mode emode, ArrayList aggParameters)
throws SemanticException {
@@ -4087,7 +4087,7 @@
return r;
}
- static GenericUDAFEvaluator.Mode groupByDescModeToUDAFMode(
+ public static GenericUDAFEvaluator.Mode groupByDescModeToUDAFMode(
GroupByDesc.Mode mode, boolean isDistinct) {
switch (mode) {
case COMPLETE:
@@ -4130,7 +4130,7 @@
* @return the ExprNodeDesc of the constant parameter if the given internalName represents
* a constant parameter; otherwise, return null
*/
- private ExprNodeDesc isConstantParameterInAggregationParameters(String internalName,
+ public static ExprNodeDesc isConstantParameterInAggregationParameters(String internalName,
List reduceValues) {
// only the pattern of "VALUE._col([0-9]+)" should be handled.
@@ -5577,7 +5577,7 @@
return false;
}
- private void checkExpressionsForGroupingSet(List grpByExprs,
+ void checkExpressionsForGroupingSet(List grpByExprs,
List distinctGrpByExprs,
Map aggregationTrees,
RowResolver inputRowResolver) throws SemanticException {
@@ -6131,7 +6131,7 @@
}
@SuppressWarnings("nls")
- private Operator genFileSinkPlan(String dest, QB qb, Operator input)
+ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
RowResolver inputRR = opParseCtx.get(input).getRowResolver();
@@ -9234,7 +9234,7 @@
return equalsExpr;
}
- private String getAliasId(String alias, QB qb) {
+ protected String getAliasId(String alias, QB qb) {
return (qb.getId() == null ? alias : qb.getId() + ":" + alias).toLowerCase();
}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java (working copy)
@@ -49,8 +49,8 @@
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
@@ -58,8 +58,10 @@
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
@@ -117,6 +119,7 @@
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfigContext;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
@@ -135,8 +138,11 @@
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4JoinRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinTypeCheckCtx;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter;
@@ -175,6 +181,7 @@
import com.google.common.collect.Lists;
public class CalcitePlanner extends SemanticAnalyzer {
+
private final AtomicInteger noColsMissingStats = new AtomicInteger(0);
private List topLevelFieldSchema;
private SemanticException semanticException;
@@ -218,13 +225,16 @@
if (cboCtx.type == PreCboCtx.Type.CTAS) {
queryForCbo = cboCtx.nodeOfInterest; // nodeOfInterest is the query
}
- runCBO = canHandleAstForCbo(queryForCbo, getQB(), cboCtx);
+ runCBO = canCBOHandleAst(queryForCbo, getQB(), cboCtx);
if (runCBO) {
disableJoinMerge = true;
boolean reAnalyzeAST = false;
try {
+ if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
+ sinkOp = getOptimizedHiveOPDag();
+ } else {
// 1. Gen Optimized AST
ASTNode newAST = getOptimizedAST();
@@ -252,6 +262,7 @@
LOG.info("CBO Succeeded; optimized logical plan.");
this.ctx.setCboInfo("Plan optimized by CBO.");
LOG.debug(newAST.dump());
+ }
} catch (Exception e) {
boolean isMissingStats = noColsMissingStats.get() > 0;
if (isMissingStats) {
@@ -324,7 +335,7 @@
* If top level QB is query then everything below it must also be
* Query.
*/
- boolean canHandleAstForCbo(ASTNode ast, QB qb, PreCboCtx cboCtx) {
+ boolean canCBOHandleAst(ASTNode ast, QB qb, PreCboCtx cboCtx) {
int root = ast.getToken().getType();
boolean needToLogMessage = STATIC_LOG.isInfoEnabled();
boolean isSupportedRoot = root == HiveParser.TOK_QUERY || root == HiveParser.TOK_EXPLAIN
@@ -598,6 +609,57 @@
return optiqOptimizedAST;
}
+ /**
+ * Get Optimized Hive Operator DAG for the given QB tree in the semAnalyzer.
+ *
+ * @return Optimized Hive operator tree
+ * @throws SemanticException
+ */
+ Operator getOptimizedHiveOPDag() throws SemanticException {
+ RelNode optimizedOptiqPlan = null;
+ CalcitePlannerAction calcitePlannerAction = new CalcitePlannerAction(prunedPartitions);
+
+ try {
+ optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks
+ .newConfigBuilder().typeSystem(new HiveTypeSystemImpl()).build());
+ } catch (Exception e) {
+ rethrowCalciteException(e);
+ throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage());
+ }
+
+ RelNode modifiedOptimizedOptiqPlan = introduceProjectIfNeeded(optimizedOptiqPlan);
+
+ Operator> hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps,
+ conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict")).convert(modifiedOptimizedOptiqPlan);
+ RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB());
+ opParseCtx.put(hiveRoot, new OpParseContext(hiveRootRR));
+ return genFileSinkPlan(getQB().getParseInfo().getClauseNames().iterator().next(), getQB(), hiveRoot);
+ }
+
+ private RelNode introduceProjectIfNeeded(RelNode optimizedOptiqPlan)
+ throws CalciteSemanticException {
+ RelNode parent = null;
+ RelNode input = optimizedOptiqPlan;
+ RelNode newRoot = optimizedOptiqPlan;
+
+ while (!(input instanceof Project) && (input instanceof Sort)) {
+ parent = input;
+ input = input.getInput(0);
+ }
+
+ if (!(input instanceof Project)) {
+ HiveProject hpRel = HiveProject.create(input,
+ HiveCalciteUtil.getProjsFromBelowAsInputRef(input), input.getRowType().getFieldNames());
+ if (input == optimizedOptiqPlan) {
+ newRoot = hpRel;
+ } else {
+ parent.replaceInput(0, hpRel);
+ }
+ }
+
+ return newRoot;
+ }
+
/***
* Unwraps Calcite Invocation exceptions coming meta data provider chain and
* obtains the real cause.
@@ -674,6 +736,24 @@
|| t instanceof UndeclaredThrowableException;
}
+ private RowResolver genRowResolver(Operator op, QB qb) {
+ RowResolver rr = new RowResolver();
+ String subqAlias = (qb.getAliases().size() == 1 && qb.getSubqAliases().size() == 1) ? qb
+ .getAliases().get(0) : null;
+
+ for (ColumnInfo ci : op.getSchema().getSignature()) {
+ try {
+ rr.putWithCheck((subqAlias != null) ? subqAlias : ci.getTabAlias(),
+ ci.getAlias() != null ? ci.getAlias() : ci.getInternalName(), ci.getInternalName(),
+ new ColumnInfo(ci));
+ } catch (SemanticException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return rr;
+ }
+
/**
* Code responsible for Calcite plan generation and optimization.
*/
@@ -700,7 +780,8 @@
/*
* recreate cluster, so that it picks up the additional traitDef
*/
- RelOptPlanner planner = HiveVolcanoPlanner.createPlanner();
+ HiveConfigContext confContext = new HiveConfigContext(conf);
+ RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext);
final RelOptQuery query = new RelOptQuery(planner);
final RexBuilder rexBuilder = cluster.getRexBuilder();
cluster = query.createCluster(rexBuilder.getTypeFactory(), rexBuilder);
@@ -719,13 +800,16 @@
throw new RuntimeException(e);
}
+ // Create MD provider
+ HiveDefaultRelMetadataProvider mdProvider = new HiveDefaultRelMetadataProvider(conf);
+
// 2. Apply Pre Join Order optimizations
calcitePreCboPlan = applyPreJoinOrderingTransforms(calciteGenPlan,
- HiveDefaultRelMetadataProvider.INSTANCE);
+ mdProvider.getMetadataProvider());
// 3. Appy Join Order Optimizations using Hep Planner (MST Algorithm)
List list = Lists.newArrayList();
- list.add(HiveDefaultRelMetadataProvider.INSTANCE);
+ list.add(mdProvider.getMetadataProvider());
RelTraitSet desiredTraits = cluster
.traitSetOf(HiveRelNode.CONVENTION, RelCollations.EMPTY);
@@ -758,6 +842,18 @@
calciteOptimizedPlan = hepPlanner.findBestExp();
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
+ // run rules to aid in translation from Optiq tree -> Hive tree
+ hepPgm = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .addRuleInstance(new HiveInsertExchange4JoinRule()).build();
+ hepPlanner = new HepPlanner(hepPgm);
+
+ hepPlanner.registerMetadataProviders(list);
+ cluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner));
+ hepPlanner.setRoot(calciteOptimizedPlan);
+ calciteOptimizedPlan = hepPlanner.findBestExp();
+ }
+
if (LOG.isDebugEnabled() && !conf.getBoolVar(ConfVars.HIVE_IN_TEST)) {
LOG.debug("CBO Planning details:\n");
LOG.debug("Original Plan:\n" + RelOptUtil.toString(calciteGenPlan));
@@ -789,7 +885,12 @@
basePlan = hepPlan(basePlan, true, mdProvider, SemiJoinJoinTransposeRule.INSTANCE,
SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE);
- // 2. PPD
+ // 2. Add not null filters
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
+ basePlan = hepPlan(basePlan, true, mdProvider, HiveJoinAddNotNullRule.INSTANCE);
+ }
+
+ // 3. PPD
basePlan = hepPlan(basePlan, true, mdProvider,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE,
@@ -802,19 +903,19 @@
HiveFilterJoinRule.FILTER_ON_JOIN, new FilterAggregateTransposeRule(Filter.class,
HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class));
- // 3. Transitive inference & Partition Pruning
+ // 4. Transitive inference & Partition Pruning
basePlan = hepPlan(basePlan, false, mdProvider, new JoinPushTransitivePredicatesRule(
Join.class, HiveFilter.DEFAULT_FILTER_FACTORY),
new HivePartitionPruneRule(conf));
- // 4. Projection Pruning
+ // 5. Projection Pruning
RelFieldTrimmer fieldTrimmer = new RelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY,
HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY,
RelFactories.DEFAULT_SEMI_JOIN_FACTORY, HiveSort.HIVE_SORT_REL_FACTORY,
HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY);
basePlan = fieldTrimmer.trim(basePlan);
- // 5. Rerun PPD through Project as column pruning would have introduced DT
+ // 6. Rerun PPD through Project as column pruning would have introduced DT
// above scans
basePlan = hepPlan(basePlan, true, mdProvider,
new FilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY,
@@ -1186,7 +1287,7 @@
}
// 2. Get Table Metadata
- Table tab = qb.getMetaData().getSrcForAlias(tableAlias);
+ Table tabMetaData = qb.getMetaData().getSrcForAlias(tableAlias);
// 3. Get Table Logical Schema (Row Type)
// NOTE: Table logical schema = Non Partition Cols + Partition Cols +
@@ -1194,7 +1295,7 @@
// 3.1 Add Column info for non partion cols (Object Inspector fields)
@SuppressWarnings("deprecation")
- StructObjectInspector rowObjectInspector = (StructObjectInspector) tab.getDeserializer()
+ StructObjectInspector rowObjectInspector = (StructObjectInspector) tabMetaData.getDeserializer()
.getObjectInspector();
List extends StructField> fields = rowObjectInspector.getAllStructFieldRefs();
ColumnInfo colInfo;
@@ -1216,7 +1317,7 @@
ArrayList partitionColumns = new ArrayList();
// 3.2 Add column info corresponding to partition columns
- for (FieldSchema part_col : tab.getPartCols()) {
+ for (FieldSchema part_col : tabMetaData.getPartCols()) {
colName = part_col.getName();
colInfo = new ColumnInfo(colName,
TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), tableAlias, true);
@@ -1226,6 +1327,7 @@
}
// 3.3 Add column info corresponding to virtual columns
+ List virtualCols = new ArrayList();
Iterator vcs = VirtualColumn.getRegistry(conf).iterator();
while (vcs.hasNext()) {
VirtualColumn vc = vcs.next();
@@ -1233,24 +1335,26 @@
vc.getIsHidden());
rr.put(tableAlias, vc.getName(), colInfo);
cInfoLst.add(colInfo);
+ virtualCols.add(vc);
}
// 3.4 Build row type from field
RelDataType rowType = TypeConverter.getType(cluster, rr, null);
// 4. Build RelOptAbstractTable
- String fullyQualifiedTabName = tab.getDbName();
- if (fullyQualifiedTabName != null && !fullyQualifiedTabName.isEmpty())
- fullyQualifiedTabName = fullyQualifiedTabName + "." + tab.getTableName();
- else
- fullyQualifiedTabName = tab.getTableName();
+ String fullyQualifiedTabName = tabMetaData.getDbName();
+ if (fullyQualifiedTabName != null && !fullyQualifiedTabName.isEmpty()) {
+ fullyQualifiedTabName = fullyQualifiedTabName + "." + tabMetaData.getTableName();
+ }
+ else {
+ fullyQualifiedTabName = tabMetaData.getTableName();
+ }
RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName,
- tableAlias, rowType, tab, nonPartitionColumns, partitionColumns, conf, partitionCache,
- noColsMissingStats);
+ rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf,
+ partitionCache, noColsMissingStats, getAliasId(tableAlias, qb));
// 5. Build Hive Table Scan Rel
- tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable,
- rowType);
+ tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, null == tableAlias ? tabMetaData.getTableName() : tableAlias);
// 6. Add Schema(RR) to RelNode-Schema map
ImmutableMap hiveToCalciteColMap = buildHiveToCalciteColumnMap(rr,
@@ -1747,10 +1851,40 @@
private RelNode genGBLogicalPlan(QB qb, RelNode srcRel) throws SemanticException {
RelNode gbRel = null;
QBParseInfo qbp = getQBParseInfo(qb);
+ // NOTE: Multi Insert is not supported
+ String detsClauseName = qbp.getClauseNames().iterator().next();
+ List grpByAstExprs = SemanticAnalyzer.getGroupByForClause(qbp, detsClauseName);
+ HashMap aggregationTrees = qbp.getAggregationExprsForClause(detsClauseName);
+ // NOTE: Multi Insert is not supported
+ boolean cubeRollupGrpSetPresent = (!qbp.getDestRollups().isEmpty()
+ || !qbp.getDestGroupingSets().isEmpty() || !qbp.getDestCubes().isEmpty());
+ // 0. Sanity check
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
+ && qbp.getDistinctFuncExprsForClause(detsClauseName).size() > 1) {
+ throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.getMsg());
+ }
+ if (cubeRollupGrpSetPresent) {
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOMAPAGGR.getMsg());
+ }
+
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+ checkExpressionsForGroupingSet(grpByAstExprs, qb.getParseInfo()
+ .getDistinctFuncExprsForClause(detsClauseName), aggregationTrees,
+ this.relToHiveRR.get(srcRel));
+
+ if (qbp.getDestGroupingSets().size() > conf
+ .getIntVar(HiveConf.ConfVars.HIVE_NEW_JOB_GROUPING_SET_CARDINALITY)) {
+ String errorMsg = "The number of rows per input row due to grouping sets is "
+ + qbp.getDestGroupingSets().size();
+ throw new SemanticException(
+ ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_SKEW.getMsg(errorMsg));
+ }
+ }
+ }
+
// 1. Gather GB Expressions (AST) (GB + Aggregations)
- // NOTE: Multi Insert is not supported
- String detsClauseName = qbp.getClauseNames().iterator().next();
// Check and transform group by *. This will only happen for select distinct *.
// Here the "genSelectPlan" is being leveraged.
// The main benefits are (1) remove virtual columns that should
@@ -1768,8 +1902,6 @@
qbp.setSelExprForClause(detsClauseName, SemanticAnalyzer.genSelectDIAST(rr));
}
}
- List grpByAstExprs = SemanticAnalyzer.getGroupByForClause(qbp, detsClauseName);
- HashMap aggregationTrees = qbp.getAggregationExprsForClause(detsClauseName);
boolean hasGrpByAstExprs = (grpByAstExprs != null && !grpByAstExprs.isEmpty()) ? true : false;
boolean hasAggregationTrees = (aggregationTrees != null && !aggregationTrees.isEmpty()) ? true
: false;
@@ -1802,9 +1934,7 @@
// 4. GroupingSets, Cube, Rollup
int groupingColsSize = gbExprNDescLst.size();
List groupingSets = null;
- if (!qbp.getDestRollups().isEmpty()
- || !qbp.getDestGroupingSets().isEmpty()
- || !qbp.getDestCubes().isEmpty()) {
+ if (cubeRollupGrpSetPresent) {
if (qbp.getDestRollups().contains(detsClauseName)) {
groupingSets = getGroupingSetsForRollup(grpByAstExprs.size());
} else if (qbp.getDestCubes().contains(detsClauseName)) {
@@ -2250,15 +2380,27 @@
}
}
- return genSelectRelNode(projsForWindowSelOp, out_rwsch, srcRel);
+ return genSelectRelNode(projsForWindowSelOp, out_rwsch, srcRel, windowExpressions);
}
private RelNode genSelectRelNode(List calciteColLst, RowResolver out_rwsch,
RelNode srcRel) throws CalciteSemanticException {
+ return genSelectRelNode(calciteColLst, out_rwsch, srcRel, null);
+ }
+
+ private RelNode genSelectRelNode(List calciteColLst, RowResolver out_rwsch,
+ RelNode srcRel, List windowExpressions) throws CalciteSemanticException {
// 1. Build Column Names
Set colNamesSet = new HashSet();
List cInfoLst = out_rwsch.getRowSchema().getSignature();
ArrayList columnNames = new ArrayList();
+ Map windowToAlias = null;
+ if (windowExpressions != null ) {
+ windowToAlias = new HashMap();
+ for (WindowExpressionSpec wes : windowExpressions) {
+ windowToAlias.put(wes.getExpression().toStringTree().toLowerCase(), wes.getAlias());
+ }
+ }
String[] qualifiedColNames;
String tmpColAlias;
for (int i = 0; i < calciteColLst.size(); i++) {
@@ -2276,8 +2418,11 @@
* the names so we don't run into this issue when converting back to
* Hive AST.
*/
- if (tmpColAlias.startsWith("_c"))
+ if (tmpColAlias.startsWith("_c")) {
tmpColAlias = "_o_" + tmpColAlias;
+ } else if (windowToAlias != null && windowToAlias.containsKey(tmpColAlias)) {
+ tmpColAlias = windowToAlias.get(tmpColAlias);
+ }
int suffix = 1;
while (colNamesSet.contains(tmpColAlias)) {
tmpColAlias = qualifiedColNames[1] + suffix;
@@ -2769,4 +2914,5 @@
return tabAliases;
}
}
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (working copy)
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
/**
@@ -102,6 +103,14 @@
return tableNames;
}
+ public List getColumnNames() {
+ List columnNames = new ArrayList();
+ for (ColumnInfo var : this.signature) {
+ columnNames.add(var.getInternalName());
+ }
+ return columnNames;
+ }
+
@Override
public boolean equals(Object obj) {
if (!(obj instanceof RowSchema) || (obj == null)) {
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy)
@@ -531,14 +531,15 @@
Operator extends OperatorDesc> child = op.getChildOperators().get(0);
- List childCols;
+ List childCols = null;
if (child instanceof CommonJoinOperator) {
- childCols = cppCtx.getJoinPrunedColLists().get(child)
+ childCols = cppCtx.getJoinPrunedColLists().get(child) == null
+ ? null : cppCtx.getJoinPrunedColLists().get(child)
.get((byte) conf.getTag());
} else {
childCols = cppCtx.getPrunedColList(child);
+ }
- }
List valCols = conf.getValueCols();
List valColNames = conf.getOutputValueColumnNames();
@@ -749,6 +750,7 @@
conf.setOutputColumnNames(newOutputColumnNames);
handleChildren(op, cols, cppCtx);
}
+
return null;
}
@@ -971,12 +973,12 @@
.getChildOperators();
LOG.info("JOIN " + op.getIdentifier() + " oldExprs: " + conf.getExprs());
+
List childColLists = cppCtx.genColLists(op);
if (childColLists == null) {
return;
}
-
Map> prunedColLists = new HashMap>();
for (byte tag : conf.getTagOrder()) {
prunedColLists.put(tag, new ArrayList());
@@ -1076,6 +1078,7 @@
}
LOG.info("JOIN " + op.getIdentifier() + " newExprs: " + conf.getExprs());
+
op.setColumnExprMap(newColExprMap);
conf.setOutputColumnNames(outputCols);
op.getSchema().setSignature(rs);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (working copy)
@@ -242,4 +242,4 @@
return null;
}
}
-}
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy)
@@ -142,7 +142,9 @@
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
transformations.add(new ReduceSinkDeDuplication());
}
+ if(!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
transformations.add(new NonBlockingOpDeDupProc());
+ }
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEIDENTITYPROJECTREMOVER)) {
transformations.add(new IdentityProjectRemover());
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java (revision 1672450)
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
+
+public class HiveRelDistribution implements RelDistribution {
+
+ List keys;
+ RelDistribution.Type type;
+
+ public HiveRelDistribution(Type type, List keys) {
+ this.type = type;
+ this.keys = keys;
+ }
+
+ @Override
+ public RelTraitDef> getTraitDef() {
+ return RelDistributionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ if (trait == this) {
+ return true;
+ }
+ switch (((RelDistribution)trait).getType()) {
+ case HASH_DISTRIBUTED :
+ return this.getKeys().equals(((RelDistribution)trait).getKeys());
+ default:
+ throw new RuntimeException("Other distributions are not used yet.");
+ }
+ }
+
+ @Override
+ public RelDistribution apply(TargetMapping mapping) {
+ if (keys.isEmpty()) {
+ return this;
+ }
+ return new HiveRelDistribution(type, keys);
+ }
+
+ @Override
+ public List getKeys() {
+ return keys;
+ }
+
+ @Override
+ public Type getType() {
+ return type;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java (revision 1672450)
@@ -0,0 +1,16 @@
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelCollation extends RelCollationImpl {
+
+ public HiveRelCollation(ImmutableList fieldCollations) {
+ super(fieldCollations);
+ }
+
+}
+
+
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java (revision 1672450)
@@ -0,0 +1,20 @@
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import org.apache.calcite.plan.Context;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+
+public class HiveConfigContext implements Context {
+ private HiveConf config;
+
+ public HiveConfigContext(HiveConf config) {
+ this.config = config;
+ }
+
+ public T unwrap(Class clazz) {
+ if (clazz.isInstance(config)) {
+ return clazz.cast(config);
+ }
+ return null;
+ }
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java (working copy)
@@ -28,8 +28,10 @@
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelOptUtil.InputReferencedVisitor;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
@@ -50,13 +52,18 @@
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -319,11 +326,11 @@
return this.mapOfProjIndxInJoinSchemaToLeafPInfo;
}
- public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j) {
+ public static JoinPredicateInfo constructJoinPredicateInfo(Join j) {
return constructJoinPredicateInfo(j, j.getCondition());
}
- public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j, RexNode predicate) {
+ public static JoinPredicateInfo constructJoinPredicateInfo(Join j, RexNode predicate) {
JoinPredicateInfo jpi = null;
JoinLeafPredicateInfo jlpi = null;
List equiLPIList = new ArrayList();
@@ -432,6 +439,16 @@
.copyOf(projsFromRightPartOfJoinKeysInJoinSchema);
}
+ public List getJoinKeyExprs(int input) {
+ if (input == 0) {
+ return this.joinKeyExprsFromLeft;
+ }
+ if (input == 1) {
+ return this.joinKeyExprsFromRight;
+ }
+ return null;
+ }
+
public List getJoinKeyExprsFromLeft() {
return this.joinKeyExprsFromLeft;
}
@@ -461,7 +478,7 @@
return this.projsFromRightPartOfJoinKeysInJoinSchema;
}
- private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(HiveJoin j, RexNode pe) {
+ private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(Join j, RexNode pe) {
JoinLeafPredicateInfo jlpi = null;
List filterNulls = new ArrayList();
List joinKeyExprsFromLeft = new ArrayList();
@@ -561,6 +578,107 @@
return deterministic;
}
+ public static ImmutableMap getColInfoMap(List hiveCols,
+ int startIndx) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = startIndx;
+ for (T ci : hiveCols) {
+ bldr.put(indx, ci);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap shiftVColsMap(Map hiveVCols,
+ int shift) {
+ Builder bldr = ImmutableMap. builder();
+
+ for (Integer pos : hiveVCols.keySet()) {
+ bldr.put(shift + pos, hiveVCols.get(pos));
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap getVColsMap(List hiveVCols,
+ int startIndx) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = startIndx;
+ for (VirtualColumn vc : hiveVCols) {
+ bldr.put(indx, vc);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap getColNameIndxMap(List tableFields) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = 0;
+ for (FieldSchema fs : tableFields) {
+ bldr.put(fs.getName(), indx);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap getRowColNameIndxMap(List rowFields) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = 0;
+ for (RelDataTypeField rdt : rowFields) {
+ bldr.put(rdt.getName(), indx);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableList getInputRef(List inputRefs, RelNode inputRel) {
+ ImmutableList.Builder bldr = ImmutableList. builder();
+ for (int i : inputRefs) {
+ bldr.add(new RexInputRef(i, (RelDataType) inputRel.getRowType().getFieldList().get(i).getType()));
+ }
+ return bldr.build();
+ }
+
+ public static ExprNodeDesc getExprNode(Integer inputRefIndx, RelNode inputRel,
+ ExprNodeConverter exprConv) {
+ ExprNodeDesc exprNode = null;
+ RexNode rexInputRef = new RexInputRef(inputRefIndx, (RelDataType) inputRel.getRowType()
+ .getFieldList().get(inputRefIndx).getType());
+ exprNode = rexInputRef.accept(exprConv);
+
+ return exprNode;
+ }
+
+ public static List getExprNodes(List inputRefs, RelNode inputRel,
+ String inputTabAlias) {
+ List exprNodes = new ArrayList();
+ List rexInputRefs = getInputRef(inputRefs, inputRel);
+ // TODO: Change ExprNodeConverter to be independent of Partition Expr
+ ExprNodeConverter exprConv = new ExprNodeConverter(inputTabAlias, inputRel.getRowType(), false, inputRel.getCluster().getTypeFactory());
+ for (RexNode iRef : rexInputRefs) {
+ exprNodes.add(iRef.accept(exprConv));
+ }
+ return exprNodes;
+ }
+
+ public static List getFieldNames(List inputRefs, RelNode inputRel) {
+ List fieldNames = new ArrayList();
+ List schemaNames = inputRel.getRowType().getFieldNames();
+ for (Integer iRef : inputRefs) {
+ fieldNames.add(schemaNames.get(iRef));
+ }
+
+ return fieldNames;
+ }
+
/**
* Walks over an expression and determines whether it is constant.
*/
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java (revision 1672450)
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdParallelism;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+
+public class HiveRelMdParallelism extends RelMdParallelism {
+
+ private final Double maxSplitSize;
+
+ //~ Constructors -----------------------------------------------------------
+
+ public HiveRelMdParallelism(Double maxSplitSize) {
+ this.maxSplitSize = maxSplitSize;
+ }
+
+ public RelMetadataProvider getMetadataProvider() {
+ return ReflectiveRelMetadataProvider.reflectiveSource(this,
+ BuiltInMethod.IS_PHASE_TRANSITION.method,
+ BuiltInMethod.SPLIT_COUNT.method);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ public Boolean isPhaseTransition(HiveJoin join) {
+ // As Exchange operator is introduced later on, we make a
+ // common join operator create a new stage for the moment
+ if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) {
+ return true;
+ }
+ return false;
+ }
+
+ public Boolean isPhaseTransition(HiveSort sort) {
+ // As Exchange operator is introduced later on, we make a
+ // sort operator create a new stage for the moment
+ return true;
+ }
+
+ public Integer splitCount(HiveJoin join) {
+ if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) {
+ return splitCountRepartition(join);
+ }
+ else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.SMB_JOIN) {
+ RelNode largeInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ largeInput = join.getLeft();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ largeInput = join.getRight();
+ } else {
+ return null;
+ }
+ return splitCount(largeInput);
+ }
+ return null;
+ }
+
+ public Integer splitCount(HiveTableScan scan) {
+ RelOptHiveTable table = (RelOptHiveTable) scan.getTable();
+ return table.getHiveTableMD().getNumBuckets();
+ }
+
+ public Integer splitCount(RelNode rel) {
+ Boolean newPhase = RelMetadataQuery.isPhaseTransition(rel);
+
+ if (newPhase == null) {
+ return null;
+ }
+
+ if (newPhase) {
+ // We repartition: new number of splits
+ return splitCountRepartition(rel);
+ }
+
+ // We do not repartition: take number of splits from children
+ Integer splitCount = 0;
+ for (RelNode input : rel.getInputs()) {
+ splitCount += RelMetadataQuery.splitCount(input);
+ }
+ return splitCount;
+ }
+
+ public Integer splitCountRepartition(RelNode rel) {
+ // We repartition: new number of splits
+ final Double averageRowSize = RelMetadataQuery.getAverageRowSize(rel);
+ final Double rowCount = RelMetadataQuery.getRowCount(rel);
+ if (averageRowSize == null || rowCount == null) {
+ return null;
+ }
+ final Double totalSize = averageRowSize * rowCount;
+ final Double splitCount = totalSize / maxSplitSize;
+ return splitCount.intValue();
+ }
+
+}
+
+// End RelMdParallelism.java
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java (revision 1672450)
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdMemory;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveLimit;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+
+public class HiveRelMdMemory extends RelMdMemory {
+
+ private static final HiveRelMdMemory INSTANCE = new HiveRelMdMemory();
+
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(INSTANCE,
+ BuiltInMethod.MEMORY.method,
+ BuiltInMethod.CUMULATIVE_MEMORY_WITHIN_PHASE.method,
+ BuiltInMethod.CUMULATIVE_MEMORY_WITHIN_PHASE_SPLIT.method);
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdMemory() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public Double memory(HiveTableScan tableScan) {
+ return 0.0d;
+ }
+
+ public Double memory(HiveAggregate aggregate) {
+ final Double avgRowSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput());
+ final Double rowCount = RelMetadataQuery.getRowCount(aggregate.getInput());
+ if (avgRowSize == null || rowCount == null) {
+ return null;
+ }
+ return avgRowSize * rowCount;
+ }
+
+ public Double memory(HiveFilter filter) {
+ return 0.0;
+ }
+
+ public Double memory(HiveJoin join) {
+ Double memory = 0.0;
+ if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) {
+ // Left side
+ final Double leftAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double leftRowCount = RelMetadataQuery.getRowCount(join.getLeft());
+ if (leftAvgRowSize == null || leftRowCount == null) {
+ return null;
+ }
+ memory += leftAvgRowSize * leftRowCount;
+ // Right side
+ final Double rightAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ final Double rightRowCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (rightAvgRowSize == null || rightRowCount == null) {
+ return null;
+ }
+ memory += rightAvgRowSize * rightRowCount;
+ } else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) {
+ RelNode inMemoryInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ inMemoryInput = join.getRight();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ inMemoryInput = join.getLeft();
+ } else {
+ return null;
+ }
+ // Result
+ final Double avgRowSize = RelMetadataQuery.getAverageRowSize(inMemoryInput);
+ final Double rowCount = RelMetadataQuery.getRowCount(inMemoryInput);
+ if (avgRowSize == null || rowCount == null) {
+ return null;
+ }
+ memory = avgRowSize * rowCount;
+ }
+ return memory;
+ }
+
+ public Double cumulativeMemoryWithinPhaseSplit(HiveJoin join) {
+ if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) {
+ // Check streaming side
+ RelNode inMemoryInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ inMemoryInput = join.getRight();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ inMemoryInput = join.getLeft();
+ } else {
+ return null;
+ }
+
+ if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN) {
+ // If simple map join, the whole relation goes in memory
+ return RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput);
+ }
+ else if (join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) {
+ // If bucket map join, only a split goes in memory
+ final Double memoryInput =
+ RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput);
+ final Integer splitCount = RelMetadataQuery.splitCount(inMemoryInput);
+ if (memoryInput == null || splitCount == null) {
+ return null;
+ }
+ return memoryInput / splitCount;
+ }
+ }
+ // Else, we fall back to default
+ return super.cumulativeMemoryWithinPhaseSplit(join);
+ }
+
+ public Double memory(HiveLimit limit) {
+ return 0.0;
+ }
+
+ public Double memory(HiveProject project) {
+ return 0.0;
+ }
+
+ public Double memory(HiveSort sort) {
+ if (sort.getCollation() != RelCollations.EMPTY) {
+ // It sorts
+ final Double avgRowSize = RelMetadataQuery.getAverageRowSize(sort.getInput());
+ final Double rowCount = RelMetadataQuery.getRowCount(sort.getInput());
+ if (avgRowSize == null || rowCount == null) {
+ return null;
+ }
+ return avgRowSize * rowCount;
+ }
+ // It does not sort, memory footprint is zero
+ return 0.0;
+ }
+
+ public Double memory(HiveUnion union) {
+ return 0.0;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java (revision 1672450)
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdSize;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelMdSize extends RelMdSize {
+
+ private static final HiveRelMdSize INSTANCE = new HiveRelMdSize();
+
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(INSTANCE,
+ BuiltInMethod.AVERAGE_COLUMN_SIZES.method,
+ BuiltInMethod.AVERAGE_ROW_SIZE.method);
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdSize() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public List averageColumnSizes(HiveTableScan scan) {
+ List neededcolsLst = scan.getNeededColIndxsFrmReloptHT();
+ Set needColsSet = new HashSet(neededcolsLst);
+ List columnStatistics = ((RelOptHiveTable) scan.getTable())
+ .getColStat(neededcolsLst);
+
+ // Obtain list of col stats, or use default if they are not available
+ final ImmutableList.Builder list = ImmutableList.builder();
+ int indxRqdCol = 0;
+ int nFields = scan.getRowType().getFieldCount();
+ for (int i = 0; i < nFields; i++) {
+ if (needColsSet.contains(i)) {
+ ColStatistics columnStatistic = columnStatistics.get(indxRqdCol);
+ indxRqdCol++;
+ if (columnStatistic == null) {
+ RelDataTypeField field = scan.getPrunedRowType().getFieldList().get(i);
+ list.add(averageTypeValueSize(field.getType()));
+ } else {
+ list.add(columnStatistic.getAvgColLen());
+ }
+ } else {
+ list.add(new Double(0));
+ }
+ }
+
+ return list.build();
+ }
+
+ public List averageColumnSizes(HiveJoin rel) {
+ final RelNode left = rel.getLeft();
+ final RelNode right = rel.getRight();
+ final List lefts =
+ RelMetadataQuery.getAverageColumnSizes(left);
+ List rights = null;
+ if (!rel.isLeftSemiJoin()) {
+ rights = RelMetadataQuery.getAverageColumnSizes(right);
+ }
+ if (lefts == null && rights == null) {
+ return null;
+ }
+ final int fieldCount = rel.getRowType().getFieldCount();
+ Double[] sizes = new Double[fieldCount];
+ if (lefts != null) {
+ lefts.toArray(sizes);
+ }
+ if (rights != null) {
+ final int leftCount = left.getRowType().getFieldCount();
+ for (int i = 0; i < rights.size(); i++) {
+ sizes[leftCount + i] = rights.get(i);
+ }
+ }
+ return ImmutableNullableList.copyOf(sizes);
+ }
+
+ // TODO: remove when averageTypeValueSize method RelMdSize
+ // supports all types
+ public Double averageTypeValueSize(RelDataType type) {
+ switch (type.getSqlTypeName()) {
+ case BOOLEAN:
+ case TINYINT:
+ return 1d;
+ case SMALLINT:
+ return 2d;
+ case INTEGER:
+ case FLOAT:
+ case REAL:
+ case DECIMAL:
+ case DATE:
+ case TIME:
+ return 4d;
+ case BIGINT:
+ case DOUBLE:
+ case TIMESTAMP:
+ case INTERVAL_DAY_TIME:
+ case INTERVAL_YEAR_MONTH:
+ return 8d;
+ case BINARY:
+ return (double) type.getPrecision();
+ case VARBINARY:
+ return Math.min((double) type.getPrecision(), 100d);
+ case CHAR:
+ return (double) type.getPrecision() * BYTES_PER_CHARACTER;
+ case VARCHAR:
+ // Even in large (say VARCHAR(2000)) columns most strings are small
+ return Math.min((double) type.getPrecision() * BYTES_PER_CHARACTER, 100d);
+ case ROW:
+ Double average = 0.0;
+ for (RelDataTypeField field : type.getFieldList()) {
+ average += averageTypeValueSize(field.getType());
+ }
+ return average;
+ default:
+ return null;
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java (revision 1672450)
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdDistribution;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelMdDistribution {
+
+ public static final RelMetadataProvider SOURCE =
+ ChainedRelMetadataProvider.of(
+ ImmutableList.of(
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.DISTRIBUTION.method, new HiveRelMdDistribution()),
+ RelMdDistribution.SOURCE));
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdDistribution() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public RelDistribution distribution(HiveAggregate aggregate) {
+ return new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
+ aggregate.getGroupSet().asList());
+ }
+
+ public RelDistribution distribution(HiveJoin join) {
+ // Compute distribution
+ ImmutableList.Builder keysListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder leftKeysListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder rightKeysListBuilder =
+ new ImmutableList.Builder();
+ JoinPredicateInfo joinPredInfo =
+ HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join);
+ for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) {
+ JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo.
+ getEquiJoinPredicateElements().get(i);
+ for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) {
+ keysListBuilder.add(leftPos);
+ leftKeysListBuilder.add(leftPos);
+ }
+ for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) {
+ keysListBuilder.add(rightPos);
+ rightKeysListBuilder.add(rightPos);
+ }
+ }
+
+ RelDistribution distribution;
+ switch (join.getJoinAlgorithm()) {
+ case SMB_JOIN:
+ case BUCKET_JOIN:
+ case COMMON_JOIN:
+ distribution = new HiveRelDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED, keysListBuilder.build());
+ break;
+ case MAP_JOIN:
+ // Keep buckets from the streaming relation
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ distribution = new HiveRelDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED, leftKeysListBuilder.build());
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ distribution = new HiveRelDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED, rightKeysListBuilder.build());
+ } else {
+ distribution = null;
+ }
+ break;
+ default:
+ distribution = null;
+ }
+ return distribution;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java (revision 1672450)
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelMdCollation {
+
+ public static final RelMetadataProvider SOURCE =
+ ChainedRelMetadataProvider.of(
+ ImmutableList.of(
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.COLLATIONS.method, new HiveRelMdCollation()),
+ RelMdCollation.SOURCE));
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdCollation() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public ImmutableList collations(HiveAggregate aggregate) {
+ // Compute collations
+ ImmutableList.Builder collationListBuilder =
+ new ImmutableList.Builder();
+ for (int pos : aggregate.getGroupSet().asList()) {
+ final RelFieldCollation fieldCollation = new RelFieldCollation(pos);
+ collationListBuilder.add(fieldCollation);
+ }
+ // Return aggregate collations
+ return ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(collationListBuilder.build())));
+ }
+
+ public ImmutableList collations(HiveJoin join) {
+ // Compute collations
+ ImmutableList.Builder collationListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder leftCollationListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder rightCollationListBuilder =
+ new ImmutableList.Builder();
+ JoinPredicateInfo joinPredInfo =
+ HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join);
+ for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) {
+ JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo.
+ getEquiJoinPredicateElements().get(i);
+ for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) {
+ final RelFieldCollation leftFieldCollation = new RelFieldCollation(leftPos);
+ collationListBuilder.add(leftFieldCollation);
+ leftCollationListBuilder.add(leftFieldCollation);
+ }
+ for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) {
+ final RelFieldCollation rightFieldCollation = new RelFieldCollation(rightPos);
+ collationListBuilder.add(rightFieldCollation);
+ rightCollationListBuilder.add(rightFieldCollation);
+ }
+ }
+
+ // Return join collations
+ final ImmutableList collation;
+ switch (join.getJoinAlgorithm()) {
+ case SMB_JOIN:
+ case COMMON_JOIN:
+ collation = ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(collationListBuilder.build())));
+ break;
+ case BUCKET_JOIN:
+ case MAP_JOIN:
+ // Keep order from the streaming relation
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ collation = ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(leftCollationListBuilder.build())));
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ collation = ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(rightCollationListBuilder.build())));
+ } else {
+ collation = null;
+ }
+ break;
+ default:
+ collation = null;
+ }
+ return collation;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
import java.util.ArrayList;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
import java.util.BitSet;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java (working copy)
@@ -20,21 +20,64 @@
import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveDefaultCostModel;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveRelMdCost;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveOnTezCostModel;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistribution;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdMemory;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdParallelism;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdRowCount;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdSelectivity;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdSize;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdUniqueKeys;
import com.google.common.collect.ImmutableList;
public class HiveDefaultRelMetadataProvider {
- private HiveDefaultRelMetadataProvider() {
+
+ private final HiveConf hiveConf;
+
+
+ public HiveDefaultRelMetadataProvider(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
}
- public static final RelMetadataProvider INSTANCE = ChainedRelMetadataProvider.of(ImmutableList
- .of(HiveRelMdDistinctRowCount.SOURCE,
+ public RelMetadataProvider getMetadataProvider() {
+
+ // Create cost metadata provider
+ final HiveCostModel cm;
+ if (HiveConf.getVar(this.hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.EXTENDED_COST_MODEL)) {
+ final Double maxMemory = (double) HiveConf.getLongVar(
+ this.hiveConf,
+ HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+ cm = new HiveOnTezCostModel(maxMemory);
+ } else {
+ cm = new HiveDefaultCostModel();
+ }
+
+ // Get max split size for HiveRelMdParallelism
+ final Double maxSplitSize = (double) HiveConf.getLongVar(
+ this.hiveConf,
+ HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
+
+ // Return MD provider
+ return ChainedRelMetadataProvider.of(ImmutableList
+ .of(new HiveRelMdCost(cm).getMetadataProvider(),
+ HiveRelMdDistinctRowCount.SOURCE,
HiveRelMdSelectivity.SOURCE,
HiveRelMdRowCount.SOURCE,
HiveRelMdUniqueKeys.SOURCE,
+ HiveRelMdSize.SOURCE,
+ HiveRelMdMemory.SOURCE,
+ new HiveRelMdParallelism(maxSplitSize).getMetadataProvider(),
+ HiveRelMdDistribution.SOURCE,
+ HiveRelMdCollation.SOURCE,
new DefaultRelMetadataProvider()));
}
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java (revision 1672450)
@@ -0,0 +1,49 @@
+package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SortExchange;
+
+public class HiveSortExchange extends SortExchange {
+
+ private HiveSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, RelDistribution distribution, RelCollation collation) {
+ super(cluster, traitSet, input, distribution, collation);
+ }
+
+ public HiveSortExchange(RelInput input) {
+ super(input);
+ }
+
+ /**
+ * Creates a HiveSortExchange.
+ *
+ * @param input Input relational expression
+ * @param distribution Distribution specification
+ * @param collation Collation specification
+ */
+ public static HiveSortExchange create(RelNode input,
+ RelDistribution distribution, RelCollation collation) {
+ RelOptCluster cluster = input.getCluster();
+ distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
+ RelTraitSet traitSet =
+ input.getTraitSet().replace(Convention.NONE).replace(distribution);
+ collation = RelCollationTraitDef.INSTANCE.canonize(collation);
+ return new HiveSortExchange(cluster, traitSet, input, distribution, collation);
+ }
+
+ @Override
+ public SortExchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution,
+ RelCollation newCollation) {
+ return new HiveSortExchange(getCluster(), traitSet, newInput,
+ newDistribution, newCollation);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java (working copy)
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
@@ -25,7 +27,9 @@
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.RelFactories.JoinFactory;
@@ -33,19 +37,15 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm;
//TODO: Should we convert MultiJoin to be a child of HiveJoin
public class HiveJoin extends Join implements HiveRelNode {
- // NOTE: COMMON_JOIN & SMB_JOIN are Sort Merge Join (in case of COMMON_JOIN
- // each parallel computation handles multiple splits where as in case of SMB
- // each parallel computation handles one bucket). MAP_JOIN and BUCKET_JOIN is
- // hash joins where MAP_JOIN keeps the whole data set of non streaming tables
- // in memory where as BUCKET_JOIN keeps only the b
- public enum JoinAlgorithm {
- NONE, COMMON_JOIN, MAP_JOIN, BUCKET_JOIN, SMB_JOIN
- }
public enum MapJoinStreamingRelation {
NONE, LEFT_RELATION, RIGHT_RELATION
@@ -54,17 +54,20 @@
public static final JoinFactory HIVE_JOIN_FACTORY = new HiveJoinFactoryImpl();
private final boolean leftSemiJoin;
- private final JoinAlgorithm joinAlgorithm;
- //This will be used once we do Join Algorithm selection
- @SuppressWarnings("unused")
- private final MapJoinStreamingRelation mapJoinStreamingSide = MapJoinStreamingRelation.NONE;
+ private JoinAlgorithm joinAlgorithm;
+ private MapJoinStreamingRelation mapJoinStreamingSide;
+ private RelOptCost joinCost;
+ // Whether inputs are already sorted
+ private ImmutableBitSet sortedInputs;
public static HiveJoin getJoin(RelOptCluster cluster, RelNode left, RelNode right,
RexNode condition, JoinRelType joinType, boolean leftSemiJoin) {
try {
Set variablesStopped = Collections.emptySet();
- return new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped,
- JoinAlgorithm.NONE, null, leftSemiJoin);
+ HiveJoin join = new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped,
+ JoinAlgorithm.NONE, chooseStreamingSide(left,right), null, leftSemiJoin);
+ join.sortedInputs = checkInputsCorrectOrder(join);
+ return join;
} catch (InvalidRelException e) {
throw new RuntimeException(e);
}
@@ -72,11 +75,13 @@
protected HiveJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
RexNode condition, JoinRelType joinType, Set variablesStopped,
- JoinAlgorithm joinAlgo, MapJoinStreamingRelation streamingSideForMapJoin, boolean leftSemiJoin)
- throws InvalidRelException {
+ JoinAlgorithm joinAlgo, MapJoinStreamingRelation streamingSideForMapJoin,
+ ImmutableBitSet sortedInputs, boolean leftSemiJoin) throws InvalidRelException {
super(cluster, TraitsUtil.getDefaultTraitSet(cluster), left, right, condition, joinType,
variablesStopped);
this.joinAlgorithm = joinAlgo;
+ this.mapJoinStreamingSide = streamingSideForMapJoin;
+ this.sortedInputs = sortedInputs;
this.leftSemiJoin = leftSemiJoin;
}
@@ -90,7 +95,7 @@
try {
Set variablesStopped = Collections.emptySet();
return new HiveJoin(getCluster(), traitSet, left, right, conditionExpr, joinType,
- variablesStopped, JoinAlgorithm.NONE, null, leftSemiJoin);
+ variablesStopped, joinAlgorithm, mapJoinStreamingSide, sortedInputs, leftSemiJoin);
} catch (InvalidRelException e) {
// Semantic error not possible. Must be a bug. Convert to
// internal error.
@@ -102,6 +107,22 @@
return joinAlgorithm;
}
+ public void setJoinAlgorithm(JoinAlgorithm joinAlgorithm) {
+ this.joinAlgorithm = joinAlgorithm;
+ }
+
+ public MapJoinStreamingRelation getMapJoinStreamingSide() {
+ return mapJoinStreamingSide;
+ }
+
+ public void setJoinCost(RelOptCost joinCost) {
+ this.joinCost = joinCost;
+ }
+
+ public ImmutableBitSet getSortedInputs() {
+ return sortedInputs;
+ }
+
public boolean isLeftSemiJoin() {
return leftSemiJoin;
}
@@ -111,11 +132,57 @@
*/
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- double leftRCount = RelMetadataQuery.getRowCount(getLeft());
- double rightRCount = RelMetadataQuery.getRowCount(getRight());
- return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0);
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
+ private static MapJoinStreamingRelation chooseStreamingSide(RelNode left,
+ RelNode right) {
+ Double leftInputSize = RelMetadataQuery.memory(left);
+ Double rightInputSize = RelMetadataQuery.memory(right);
+ if (leftInputSize == null && rightInputSize == null) {
+ return MapJoinStreamingRelation.NONE;
+ } else if (leftInputSize != null &&
+ (rightInputSize == null ||
+ (leftInputSize < rightInputSize))) {
+ return MapJoinStreamingRelation.RIGHT_RELATION;
+ } else if (rightInputSize != null &&
+ (leftInputSize == null ||
+ (rightInputSize <= leftInputSize))) {
+ return MapJoinStreamingRelation.LEFT_RELATION;
+ }
+ return MapJoinStreamingRelation.NONE;
+ }
+
+ private static ImmutableBitSet checkInputsCorrectOrder(HiveJoin join) {
+ ImmutableBitSet.Builder sortedInputs = new ImmutableBitSet.Builder();
+ JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.
+ constructJoinPredicateInfo(join);
+ List joinKeysInChildren = new ArrayList();
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()));
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()));
+
+ for (int i=0; i groupSets,
List aggCalls) throws InvalidRelException {
super(cluster, TraitsUtil.getDefaultTraitSet(cluster), child, indicator, groupSet,
groupSets, aggCalls);
+ this.bucketedInput = checkInputCorrectBucketing(child, groupSet);
}
@Override
@@ -66,15 +69,28 @@
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
+ private static boolean checkInputCorrectBucketing(RelNode child, ImmutableBitSet groupSet) {
+ return false;
+ //TODO: Enable this again
+ /*
+ return RelMetadataQuery.distribution(child).getKeys().
+ containsAll(groupSet.asList());
+ */
+ }
+
@Override
public double getRows() {
return RelMetadataQuery.getDistinctRowCount(this, groupSet, getCluster().getRexBuilder()
.makeLiteral(true));
}
+ public boolean isBucketedInput() {
+ return this.bucketedInput;
+ }
+
private static class HiveAggRelFactory implements AggregateFactory {
@Override
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java (working copy)
@@ -29,6 +29,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
@@ -42,7 +43,6 @@
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
-
import com.google.common.collect.ImmutableList;
public class HiveProject extends Project implements HiveRelNode {
@@ -172,7 +172,7 @@
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
@Override
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java (working copy)
@@ -17,21 +17,34 @@
*/
package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+
/**
* Relational expression representing a scan of a HiveDB collection.
*
@@ -42,6 +55,14 @@
*/
public class HiveTableScan extends TableScan implements HiveRelNode {
+ private final RelDataType hiveTableScanRowType;
+ private final ImmutableList neededColIndxsFrmReloptHT;
+ private final String tblAlias;
+
+ public String getTableAlias() {
+ return tblAlias;
+ }
+
/**
* Creates a HiveTableScan.
*
@@ -54,10 +75,17 @@
* @param table
* HiveDB table
*/
- public HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table,
- RelDataType rowtype) {
+ public HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table, String alias) {
+ this(cluster, traitSet, table, alias, table.getRowType());
+ }
+
+ private HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table, String alias,
+ RelDataType newRowtype) {
super(cluster, TraitsUtil.getDefaultTraitSet(cluster), table);
assert getConvention() == HiveRelNode.CONVENTION;
+ this.tblAlias = alias;
+ this.hiveTableScanRowType = newRowtype;
+ this.neededColIndxsFrmReloptHT = buildNeededColIndxsFrmReloptHT(table.getRowType(), newRowtype);
}
@Override
@@ -66,9 +94,21 @@
return this;
}
+ /**
+ * Copy TableScan operator with a new Row Schema. The new Row Schema can only
+ * be a subset of this TS schema.
+ *
+ * @param newRowtype
+ * @return
+ */
+ public HiveTableScan copy(RelDataType newRowtype) {
+ return new HiveTableScan(getCluster(), getTraitSet(), ((RelOptHiveTable) table), this.tblAlias,
+ newRowtype);
+ }
+
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
@Override
@@ -89,4 +129,62 @@
public List getColStat(List projIndxLst) {
return ((RelOptHiveTable) table).getColStat(projIndxLst);
}
-}
\ No newline at end of file
+
+ @Override
+ public RelNode project(ImmutableBitSet fieldsUsed, Set extraFields,
+ RelFactories.ProjectFactory projectFactory) {
+
+ // 1. If the schema is the same then bail out
+ final int fieldCount = getRowType().getFieldCount();
+ if (fieldsUsed.equals(ImmutableBitSet.range(fieldCount)) && extraFields.isEmpty()) {
+ return this;
+ }
+
+ // 2. Make sure there is no dynamic addition of virtual cols
+ if (extraFields != null && !extraFields.isEmpty()) {
+ throw new RuntimeException("Hive TS does not support adding virtual columns dynamically");
+ }
+
+ // 3. Create new TS schema that is a subset of original
+ final List fields = getRowType().getFieldList();
+ List fieldTypes = new LinkedList();
+ List fieldNames = new LinkedList();
+ List exprList = new ArrayList();
+ RexBuilder rexBuilder = getCluster().getRexBuilder();
+ for (int i : fieldsUsed) {
+ RelDataTypeField field = fields.get(i);
+ fieldTypes.add(field.getType());
+ fieldNames.add(field.getName());
+ exprList.add(rexBuilder.makeInputRef(this, i));
+ }
+
+ // 4. Build new TS
+ HiveTableScan newHT = copy(getCluster().getTypeFactory().createStructType(fieldTypes,
+ fieldNames));
+
+ // 5. Add Proj on top of TS
+ return projectFactory.createProject(newHT, exprList, new ArrayList(fieldNames));
+ }
+
+ public List getNeededColIndxsFrmReloptHT() {
+ return neededColIndxsFrmReloptHT;
+ }
+
+ public RelDataType getPrunedRowType() {
+ return hiveTableScanRowType;
+ }
+
+ private static ImmutableList buildNeededColIndxsFrmReloptHT(RelDataType htRowtype,
+ RelDataType scanRowType) {
+ Builder neededColIndxsFrmReloptHTBldr = new ImmutableList.Builder();
+ Map colNameToPosInReloptHT = HiveCalciteUtil.getRowColNameIndxMap(htRowtype
+ .getFieldList());
+ List colNamesInScanRowType = scanRowType.getFieldNames();
+
+ for (int i = 0; i < colNamesInScanRowType.size(); i++) {
+ neededColIndxsFrmReloptHTBldr.add(colNameToPosInReloptHT.get(colNamesInScanRowType.get(i)));
+ }
+
+ return neededColIndxsFrmReloptHTBldr.build();
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java (working copy)
@@ -25,9 +25,9 @@
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
public class HiveLimit extends SingleRel implements HiveRelNode {
private final RexNode offset;
@@ -52,6 +52,6 @@
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java (revision 1672450)
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import java.util.EnumSet;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+
+/**
+ * Cost model interface.
+ */
+public abstract class HiveCostModel {
+
+ private static final Log LOG = LogFactory.getLog(HiveCostModel.class);
+
+ // NOTE: COMMON_JOIN & SMB_JOIN are Sort Merge Join (in case of COMMON_JOIN
+ // each parallel computation handles multiple splits where as in case of SMB
+ // each parallel computation handles one bucket). MAP_JOIN and BUCKET_JOIN is
+ // hash joins where MAP_JOIN keeps the whole data set of non streaming tables
+ // in memory where as BUCKET_JOIN keeps only the b
+ public enum JoinAlgorithm {
+ NONE, COMMON_JOIN, MAP_JOIN, BUCKET_JOIN, SMB_JOIN
+ }
+
+ public abstract RelOptCost getDefaultCost();
+
+ public abstract RelOptCost getAggregateCost(HiveAggregate aggregate);
+
+ public RelOptCost getJoinCost(HiveJoin join) {
+ // Retrieve algorithms
+ EnumSet possibleAlgorithms = getExecutableJoinAlgorithms(join);
+
+ // Select algorithm with min cost
+ JoinAlgorithm joinAlgorithm = null;
+ RelOptCost minJoinCost = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Join algorithm selection for:\n" + RelOptUtil.toString(join));
+ }
+ for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) {
+ RelOptCost joinCost = getJoinCost(join, possibleAlgorithm);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(possibleAlgorithm + " cost: " + joinCost);
+ }
+ if (minJoinCost == null || joinCost.isLt(minJoinCost) ) {
+ joinAlgorithm = possibleAlgorithm;
+ minJoinCost = joinCost;
+ }
+ }
+ join.setJoinAlgorithm(joinAlgorithm);
+ join.setJoinCost(minJoinCost);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(joinAlgorithm + " selected");
+ }
+
+ return minJoinCost;
+ }
+
+ /**
+ * Returns the possible algorithms for a given join operator.
+ *
+ * @param join the join operator
+ * @return a set containing all the possible join algorithms that can be
+ * executed for this join operator
+ */
+ abstract EnumSet getExecutableJoinAlgorithms(HiveJoin join);
+
+ /**
+ * Returns the cost for a given algorithm and execution engine.
+ *
+ * @param join the join operator
+ * @param algorithm the join algorithm
+ * @return the cost for the given algorithm, or null if the algorithm is not
+ * defined for this execution engine
+ */
+ abstract RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm);
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java (revision 1672450)
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Cost model for Tez execution engine.
+ */
+public class HiveOnTezCostModel extends HiveCostModel {
+
+ private final Double maxMemory;
+
+
+ public HiveOnTezCostModel(Double maxMemory) {
+ this.maxMemory = maxMemory;
+ }
+
+ @Override
+ public RelOptCost getDefaultCost() {
+ return HiveCost.FACTORY.makeZeroCost();
+ }
+
+ @Override
+ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
+ if (aggregate.isBucketedInput()) {
+ return HiveCost.FACTORY.makeZeroCost();
+ } else {
+ // 1. Sum of input cardinalities
+ final Double rCount = RelMetadataQuery.getRowCount(aggregate.getInput());
+ if (rCount == null) {
+ return null;
+ }
+ // 2. CPU cost = sorting cost
+ final double cpuCost = HiveCostUtil.computeSortCPUCost(rCount);
+ // 3. IO cost = cost of writing intermediary results to local FS +
+ // cost of reading from local FS for transferring to GBy +
+ // cost of transferring map outputs to GBy operator
+ final Double rAverageSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput());
+ if (rAverageSize == null) {
+ return null;
+ }
+ final double ioCost = HiveCostUtil.computeSortIOCost(new Pair(rCount,rAverageSize));
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+ }
+
+ @Override
+ protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) {
+ Set possibleAlgorithms = new HashSet();
+
+ // Check streaming side
+ RelNode smallInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ smallInput = join.getRight();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ smallInput = join.getLeft();
+ } else {
+ smallInput = null;
+ }
+
+ if (smallInput != null) {
+ // Requirements:
+ // - For SMB, sorted by their keys on both sides and bucketed.
+ // - For Bucket, bucketed by their keys on both sides. / Fitting in memory
+ // - For Map, no additional requirement. / Fitting in memory
+
+ // Get key columns
+ JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.
+ constructJoinPredicateInfo(join);
+ List joinKeysInChildren = new ArrayList();
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()));
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()));
+
+ // Obtain number of buckets
+ Integer buckets = RelMetadataQuery.splitCount(smallInput);
+ // Obtain map algorithms for which smallest input fits in memory
+ boolean bucketFitsMemory = false;
+ boolean inputFitsMemory = false;
+ if (buckets != null) {
+ bucketFitsMemory = isFittingIntoMemory(maxMemory, smallInput, buckets);
+ }
+ inputFitsMemory = bucketFitsMemory ?
+ isFittingIntoMemory(maxMemory, smallInput, 1) : false;
+ boolean orderedBucketed = true;
+ boolean bucketed = true;
+ for (int i=0; i maxSize) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm) {
+ RelOptCost algorithmCost;
+ switch (algorithm) {
+ case COMMON_JOIN:
+ algorithmCost = computeCostCommonJoin(join);
+ break;
+ case MAP_JOIN:
+ algorithmCost = computeCostMapJoin(join);
+ break;
+ case BUCKET_JOIN:
+ algorithmCost = computeCostBucketJoin(join);
+ break;
+ case SMB_JOIN:
+ algorithmCost = computeCostSMBJoin(join);
+ break;
+ default:
+ algorithmCost = null;
+ }
+ return algorithmCost;
+ }
+
+ private static RelOptCost computeCostCommonJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = sorting cost (for each relation) +
+ // total merge cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ final double cpuCost = HiveCostUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs());
+ // 3. IO cost = cost of writing intermediary results to local FS +
+ // cost of reading from local FS for transferring to join +
+ // cost of transferring map outputs to Join operator
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final double ioCost = HiveCostUtil.computeSortMergeIOCost(relationInfos);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+ private static RelOptCost computeCostMapJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = HashTable construction cost +
+ // join cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder();
+ switch (join.getMapJoinStreamingSide()) {
+ case LEFT_RELATION:
+ streamingBuilder.set(0);
+ break;
+ case RIGHT_RELATION:
+ streamingBuilder.set(1);
+ break;
+ default:
+ return null;
+ }
+ ImmutableBitSet streaming = streamingBuilder.build();
+ final double cpuCost = HiveCostUtil.computeMapJoinCPUCost(cardinalities, streaming);
+ // 3. IO cost = cost of transferring small tables to join node *
+ // degree of parallelism
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final int parallelism = RelMetadataQuery.splitCount(join) == null
+ ? 1 : RelMetadataQuery.splitCount(join);
+ final double ioCost = HiveCostUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+ private static RelOptCost computeCostBucketJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = HashTable construction cost +
+ // join cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder();
+ switch (join.getMapJoinStreamingSide()) {
+ case LEFT_RELATION:
+ streamingBuilder.set(0);
+ break;
+ case RIGHT_RELATION:
+ streamingBuilder.set(1);
+ break;
+ default:
+ return null;
+ }
+ ImmutableBitSet streaming = streamingBuilder.build();
+ final double cpuCost = HiveCostUtil.computeBucketMapJoinCPUCost(cardinalities, streaming);
+ // 3. IO cost = cost of transferring small tables to join node *
+ // degree of parallelism
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final int parallelism = RelMetadataQuery.splitCount(join) == null
+ ? 1 : RelMetadataQuery.splitCount(join);
+ final double ioCost = HiveCostUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+ private static RelOptCost computeCostSMBJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = HashTable construction cost +
+ // join cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder();
+ switch (join.getMapJoinStreamingSide()) {
+ case LEFT_RELATION:
+ streamingBuilder.set(0);
+ break;
+ case RIGHT_RELATION:
+ streamingBuilder.set(1);
+ break;
+ default:
+ return null;
+ }
+ ImmutableBitSet streaming = streamingBuilder.build();
+ final double cpuCost = HiveCostUtil.computeSMBMapJoinCPUCost(cardinalities);
+ // 3. IO cost = cost of transferring small tables to join node *
+ // degree of parallelism
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final int parallelism = RelMetadataQuery.splitCount(join) == null
+ ? 1 : RelMetadataQuery.splitCount(join);
+ final double ioCost = HiveCostUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java (revision 1672450)
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * HiveRelMdCost supplies the implementation of cost model.
+ */
+public class HiveRelMdCost {
+
+ private final HiveCostModel hiveCostModel;
+
+ public HiveRelMdCost(HiveCostModel hiveCostModel) {
+ this.hiveCostModel = hiveCostModel;
+ }
+
+ public RelMetadataProvider getMetadataProvider() {
+ return ChainedRelMetadataProvider.of(
+ ImmutableList.of(
+ ReflectiveRelMetadataProvider.reflectiveSource(this,
+ BuiltInMethod.NON_CUMULATIVE_COST.method),
+ RelMdPercentageOriginalRows.SOURCE));
+ }
+
+ public RelOptCost getNonCumulativeCost(HiveAggregate aggregate) {
+ return hiveCostModel.getAggregateCost(aggregate);
+ }
+
+ public RelOptCost getNonCumulativeCost(HiveJoin join) {
+ return hiveCostModel.getJoinCost(join);
+ }
+
+ // Default case
+ public RelOptCost getNonCumulativeCost(RelNode rel) {
+ return hiveCostModel.getDefaultCost();
+ }
+
+}
+
+// End HiveRelMdCost.java
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java (revision 1672450)
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import java.util.EnumSet;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+
+/**
+ * Default implementation of the cost model.
+ * Currently used by MR and Spark execution engines.
+ */
+public class HiveDefaultCostModel extends HiveCostModel {
+
+ @Override
+ public RelOptCost getDefaultCost() {
+ return HiveCost.FACTORY.makeZeroCost();
+ }
+
+ @Override
+ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
+ return HiveCost.FACTORY.makeZeroCost();
+ }
+
+ @Override
+ protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) {
+ return EnumSet.of(JoinAlgorithm.NONE);
+ }
+
+ @Override
+ protected RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm) {
+ RelOptCost algorithmCost;
+ switch (algorithm) {
+ case NONE:
+ algorithmCost = computeJoinCardinalityCost(join);
+ break;
+ default:
+ algorithmCost = null;
+ }
+ return algorithmCost;
+ }
+
+ private static RelOptCost computeJoinCardinalityCost(HiveJoin join) {
+ double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672451)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java (working copy)
@@ -18,26 +18,160 @@
package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import com.google.common.collect.ImmutableList;
+
// Use this once we have Join Algorithm selection
public class HiveCostUtil {
- private static final double cpuCostInNanoSec = 1.0;
- private static final double netCostInNanoSec = 150 * cpuCostInNanoSec;
- private static final double localFSWriteCostInNanoSec = 4 * netCostInNanoSec;
- private static final double localFSReadCostInNanoSec = 4 * netCostInNanoSec;
- private static final double hDFSWriteCostInNanoSec = 10 * localFSWriteCostInNanoSec;
- @SuppressWarnings("unused")
-//Use this once we have Join Algorithm selection
- private static final double hDFSReadCostInNanoSec = 1.5 * localFSReadCostInNanoSec;
+ private static final double CPU_COST = 1.0;
+ private static final double NET_COST = 150.0 * CPU_COST;
+ private static final double LOCAL_WRITE_COST = 4.0 * NET_COST;
+ private static final double LOCAL_READ_COST = 4.0 * NET_COST;
+ private static final double HDFS_WRITE_COST = 10.0 * LOCAL_WRITE_COST;
+ private static final double HDFS_READ_COST = 1.5 * LOCAL_READ_COST;
+
public static RelOptCost computCardinalityBasedCost(HiveRelNode hr) {
return new HiveCost(hr.getRows(), 0, 0);
}
public static HiveCost computeCost(HiveTableScan t) {
double cardinality = t.getRows();
- return new HiveCost(cardinality, 0, hDFSWriteCostInNanoSec * cardinality * 0);
+ return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0);
}
+
+ public static double computeSortMergeCPUCost(
+ ImmutableList cardinalities,
+ ImmutableBitSet sorted) {
+ // Sort-merge join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos) {
+ // Sort-merge join
+ double ioCost = 0.0;
+ for (Pair relationInfo : relationInfos) {
+ ioCost += computeSortIOCost(relationInfo);
+ }
+ return ioCost;
+ }
+
+ public static double computeSortIOCost(Pair relationInfo) {
+ // Sort-merge join
+ double ioCost = 0.0;
+ double cardinality = relationInfo.left;
+ double averageTupleSize = relationInfo.right;
+ // Write cost
+ ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST;
+ // Read cost
+ ioCost += cardinality * averageTupleSize * LOCAL_READ_COST;
+ // Net transfer cost
+ ioCost += cardinality * averageTupleSize * NET_COST;
+ return ioCost;
+ }
+
+ public static double computeMapJoinCPUCost(
+ ImmutableList cardinalities,
+ ImmutableBitSet streaming) {
+ // Hash-join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos,
+ ImmutableBitSet streaming, int parallelism) {
+ // Hash-join
+ double ioCost = 0.0;
+ for (int i=0; i cardinalities,
+ ImmutableBitSet streaming) {
+ // Hash-join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos,
+ ImmutableBitSet streaming, int parallelism) {
+ // Hash-join
+ double ioCost = 0.0;
+ for (int i=0; i cardinalities) {
+ // Hash-join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos,
+ ImmutableBitSet streaming, int parallelism) {
+ // Hash-join
+ double ioCost = 0.0;
+ for (int i=0; i hiveNonPartitionCols;
+ private final ImmutableList hivePartitionCols;
private final ImmutableMap hiveNonPartitionColsMap;
private final ImmutableMap hivePartitionColsMap;
- private final int noOfProjs;
+ private final ImmutableList hiveVirtualCols;
+ private final int noOfNonVirtualCols;
final HiveConf hiveConf;
private double rowCount = -1;
@@ -67,37 +77,65 @@
PrunedPartitionList partitionList;
Map partitionCache;
AtomicInteger noColsMissingStats;
+ private final String qbID;
protected static final Log LOG = LogFactory
.getLog(RelOptHiveTable.class
.getName());
- public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, String tblAlias, RelDataType rowType,
- Table hiveTblMetadata, List hiveNonPartitionCols,
- List hivePartitionCols, HiveConf hconf, Map partitionCache, AtomicInteger noColsMissingStats) {
+ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName,
+ RelDataType rowType, Table hiveTblMetadata, List hiveNonPartitionCols,
+ List hivePartitionCols, List hiveVirtualCols, HiveConf hconf,
+ Map partitionCache, AtomicInteger noColsMissingStats,
+ String qbID) {
super(calciteSchema, qualifiedTblName, rowType);
this.hiveTblMetadata = hiveTblMetadata;
- this.tblAlias = tblAlias;
this.hiveNonPartitionCols = ImmutableList.copyOf(hiveNonPartitionCols);
- this.hiveNonPartitionColsMap = getColInfoMap(hiveNonPartitionCols, 0);
- this.hivePartitionColsMap = getColInfoMap(hivePartitionCols, hiveNonPartitionColsMap.size());
- this.noOfProjs = hiveNonPartitionCols.size() + hivePartitionCols.size();
+ this.hiveNonPartitionColsMap = HiveCalciteUtil.getColInfoMap(hiveNonPartitionCols, 0);
+ this.hivePartitionCols = ImmutableList.copyOf(hivePartitionCols);
+ this.hivePartitionColsMap = HiveCalciteUtil.getColInfoMap(hivePartitionCols, hiveNonPartitionColsMap.size());
+ this.noOfNonVirtualCols = hiveNonPartitionCols.size() + hivePartitionCols.size();
+ this.hiveVirtualCols = ImmutableList.copyOf(hiveVirtualCols);
this.hiveConf = hconf;
this.partitionCache = partitionCache;
this.noColsMissingStats = noColsMissingStats;
+ this.qbID = qbID;
}
- private static ImmutableMap getColInfoMap(List hiveCols,
- int startIndx) {
- Builder bldr = ImmutableMap. builder();
+ public RelOptHiveTable copy(RelDataType newRowType) {
+ // 1. Build map of column name to col index of original schema
+ // Assumption: Hive Table can not contain duplicate column names
+ Map nameToColIndxMap = new HashMap();
+ for (RelDataTypeField f : this.rowType.getFieldList()) {
+ nameToColIndxMap.put(f.getName(), f.getIndex());
+ }
- int indx = startIndx;
- for (ColumnInfo ci : hiveCols) {
- bldr.put(indx, ci);
- indx++;
+ // 2. Build nonPart/Part/Virtual column info for new RowSchema
+ List newHiveNonPartitionCols = new ArrayList();
+ List newHivePartitionCols = new ArrayList();
+ List newHiveVirtualCols = new ArrayList();
+ Map virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols,
+ this.noOfNonVirtualCols);
+ Integer originalColIndx;
+ ColumnInfo cInfo;
+ VirtualColumn vc;
+ for (RelDataTypeField f : newRowType.getFieldList()) {
+ originalColIndx = nameToColIndxMap.get(f.getName());
+ if ((cInfo = hiveNonPartitionColsMap.get(originalColIndx)) != null) {
+ newHiveNonPartitionCols.add(new ColumnInfo(cInfo));
+ } else if ((cInfo = hivePartitionColsMap.get(originalColIndx)) != null) {
+ newHivePartitionCols.add(new ColumnInfo(cInfo));
+ } else if ((vc = virtualColInfoMap.get(originalColIndx)) != null) {
+ newHiveVirtualCols.add(vc);
+ } else {
+ throw new RuntimeException("Copy encountered a column not seen in original TS");
}
+ }
- return bldr.build();
+ // 3. Build new Table
+ return new RelOptHiveTable(this.schema, this.name, newRowType,
+ this.hiveTblMetadata, newHiveNonPartitionCols, newHivePartitionCols, newHiveVirtualCols,
+ this.hiveConf, this.partitionCache, this.noColsMissingStats, qbID);
}
@Override
@@ -116,16 +154,57 @@
}
@Override
+ public List getCollationList() {
+ ImmutableList.Builder collationList = new ImmutableList.Builder();
+ for (Order sortColumn : this.hiveTblMetadata.getSortCols()) {
+ for (int i=0; i()
+ .add(RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(collationList.build())))
+ .build();
+ }
+
+ @Override
+ public RelDistribution getDistribution() {
+ ImmutableList.Builder columnPositions = new ImmutableList.Builder();
+ for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) {
+ for (int i=0; i rowCounts = StatsUtils.getBasicStatForPartitions(
- hiveTblMetadata, partitionList.getNotDeniedPartns(),
- StatsSetupConst.ROW_COUNT);
+ List rowCounts = StatsUtils.getBasicStatForPartitions(hiveTblMetadata,
+ partitionList.getNotDeniedPartns(), StatsSetupConst.ROW_COUNT);
rowCount = StatsUtils.getSumIgnoreNegatives(rowCounts);
} else {
@@ -143,19 +222,6 @@
return hiveTblMetadata;
}
- public String getTableAlias() {
- // NOTE: Calcite considers tbls to be equal if their names are the same. Hence
- // we need to provide Calcite the fully qualified table name (dbname.tblname)
- // and not the user provided aliases.
- // However in HIVE DB name can not appear in select list; in case of join
- // where table names differ only in DB name, Hive would require user
- // introducing explicit aliases for tbl.
- if (tblAlias == null)
- return hiveTblMetadata.getTableName();
- else
- return tblAlias;
- }
-
private String getColNamesForLogging(Set colLst) {
StringBuffer sb = new StringBuffer();
boolean firstEntry = true;
@@ -173,16 +239,21 @@
public void computePartitionList(HiveConf conf, RexNode pruneNode) {
try {
- if (!hiveTblMetadata.isPartitioned() || pruneNode == null || InputFinder.bits(pruneNode).length() == 0 ) {
- // there is no predicate on partitioning column, we need all partitions in this case.
- partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), partitionCache);
+ if (!hiveTblMetadata.isPartitioned() || pruneNode == null
+ || InputFinder.bits(pruneNode).length() == 0) {
+ // there is no predicate on partitioning column, we need all partitions
+ // in this case.
+ partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(),
+ partitionCache);
return;
}
// We have valid pruning expressions, only retrieve qualifying partitions
- ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(), true, getRelOptSchema().getTypeFactory()));
+ ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(),
+ true, this.getRelOptSchema().getTypeFactory()));
- partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(), partitionCache);
+ partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(),
+ partitionCache);
} catch (HiveException he) {
throw new RuntimeException(he);
}
@@ -289,10 +360,10 @@
if (colNamesFailedStats.isEmpty() && !partColNamesThatRqrStats.isEmpty()) {
ColStatistics cStats = null;
for (int i = 0; i < partColNamesThatRqrStats.size(); i++) {
- cStats = new ColStatistics(hiveTblMetadata.getTableName(),
- partColNamesThatRqrStats.get(i), hivePartitionColsMap.get(
- partColIndxsThatRqrStats.get(i)).getTypeName());
- cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(),partColNamesThatRqrStats.get(i)));
+ cStats = new ColStatistics(hiveTblMetadata.getTableName(), partColNamesThatRqrStats.get(i),
+ hivePartitionColsMap.get(partColIndxsThatRqrStats.get(i)).getTypeName());
+ cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(),
+ partColNamesThatRqrStats.get(i)));
hiveColStatsMap.put(partColIndxsThatRqrStats.get(i), cStats);
}
}
@@ -325,7 +396,7 @@
}
} else {
List pILst = new ArrayList();
- for (Integer i = 0; i < noOfProjs; i++) {
+ for (Integer i = 0; i < noOfNonVirtualCols; i++) {
pILst.add(i);
}
updateColStats(new HashSet(pILst));
@@ -338,10 +409,8 @@
}
/*
- * use to check if a set of columns are all partition columns.
- * true only if:
- * - all columns in BitSet are partition
- * columns.
+ * use to check if a set of columns are all partition columns. true only if: -
+ * all columns in BitSet are partition columns.
*/
public boolean containsPartitionColumnsOnly(ImmutableBitSet cols) {
@@ -352,4 +421,32 @@
}
return true;
}
+
+ public List getVirtualCols() {
+ return this.hiveVirtualCols;
}
+
+ public List getPartColumns() {
+ return this.hivePartitionCols;
+ }
+
+ public List getNonPartColumns() {
+ return this.hiveNonPartitionCols;
+ }
+
+ public String getQBID() {
+ return qbID;
+ }
+
+ public int getNoOfNonVirtualCols() {
+ return noOfNonVirtualCols;
+ }
+
+ public Map getPartColInfoMap() {
+ return hivePartitionColsMap;
+ }
+
+ public Map getNonPartColInfoMap() {
+ return hiveNonPartitionColsMap;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java (revision 1672450)
@@ -0,0 +1,891 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+import org.apache.hadoop.hive.ql.parse.JoinCond;
+import org.apache.hadoop.hive.ql.parse.JoinType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.UnparseTranslator;
+import org.apache.hadoop.hive.ql.parse.WindowingComponentizer;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.UnionDesc;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class HiveOpConverter {
+
+ private static final Log LOG = LogFactory.getLog(HiveOpConverter.class);
+
+ public static enum HIVEAGGOPMODE {
+ NO_SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan1MR
+ SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan2MR
+ NO_SKEW_MAP_SIDE_AGG, // Corresponds to SemAnalyzer
+ // genGroupByPlanMapAggrNoSkew
+ SKEW_MAP_SIDE_AGG // Corresponds to SemAnalyzer genGroupByPlanMapAggr2MR
+ };
+
+ // TODO: remove this after stashing only rqd pieces from opconverter
+ private final SemanticAnalyzer semanticAnalyzer;
+ private final HiveConf hiveConf;
+ private final UnparseTranslator unparseTranslator;
+ private final Map> topOps;
+ private final boolean strictMode;
+ private int reduceSinkTagGenerator;
+
+ public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf,
+ UnparseTranslator unparseTranslator, Map> topOps,
+ boolean strictMode) {
+ this.semanticAnalyzer = semanticAnalyzer;
+ this.hiveConf = hiveConf;
+ this.unparseTranslator = unparseTranslator;
+ this.topOps = topOps;
+ this.strictMode = strictMode;
+ this.reduceSinkTagGenerator = 0;
+ }
+
+ static class OpAttr {
+ final String tabAlias;
+ ImmutableList inputs;
+ ImmutableMap vcolMap;
+
+ OpAttr(String tabAlias, Map vcolMap, Operator... inputs) {
+ this.tabAlias = tabAlias;
+ this.vcolMap = ImmutableMap.copyOf(vcolMap);
+ this.inputs = ImmutableList.copyOf(inputs);
+ }
+
+ private OpAttr clone(Operator... inputs) {
+ return new OpAttr(tabAlias, this.vcolMap, inputs);
+ }
+ }
+
+ public Operator convert(RelNode root) throws SemanticException {
+ OpAttr opAf = dispatch(root);
+ return opAf.inputs.get(0);
+ }
+
+ OpAttr dispatch(RelNode rn) throws SemanticException {
+ if (rn instanceof HiveTableScan) {
+ return visit((HiveTableScan) rn);
+ } else if (rn instanceof HiveProject) {
+ return visit((HiveProject) rn);
+ } else if (rn instanceof HiveJoin) {
+ return visit((HiveJoin) rn);
+ } else if (rn instanceof SemiJoin) {
+ SemiJoin sj = (SemiJoin) rn;
+ HiveJoin hj = HiveJoin.getJoin(sj.getCluster(), sj.getLeft(), sj.getRight(),
+ sj.getCondition(), sj.getJoinType(), true);
+ return visit(hj);
+ } else if (rn instanceof HiveFilter) {
+ return visit((HiveFilter) rn);
+ } else if (rn instanceof HiveSort) {
+ return visit((HiveSort) rn);
+ } else if (rn instanceof HiveUnion) {
+ return visit((HiveUnion) rn);
+ } else if (rn instanceof LogicalExchange) {
+ return visit((LogicalExchange) rn);
+ } else if (rn instanceof HiveAggregate) {
+ return visit((HiveAggregate) rn);
+ }
+ LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported"
+ + " yet in return path.");
+ return null;
+ }
+
+ /**
+ * TODO: 1. PPD needs to get pushed in to TS
+ *
+ * @param scanRel
+ * @return
+ */
+ OpAttr visit(HiveTableScan scanRel) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName()
+ + " with row type: [" + scanRel.getRowType() + "]");
+ }
+
+ RelOptHiveTable ht = (RelOptHiveTable) scanRel.getTable();
+
+ // 1. Setup TableScan Desc
+ // 1.1 Build col details used by scan
+ ArrayList colInfos = new ArrayList();
+ List virtualCols = new ArrayList(ht.getVirtualCols());
+ Map