Index: pom.xml
===================================================================
--- pom.xml (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ pom.xml (working copy)
@@ -100,7 +100,7 @@
3.4
1.7.5
0.8.0.RELEASE
- 1.1.0-incubating
+ 1.2.0-incubating-SNAPSHOT
3.2.6
3.2.10
3.2.9
Property changes on: hbase-handler/pom.xml
___________________________________________________________________
Modified: svn:mergeinfo
Reverse-merged /hive/branches/cbo/hbase-handler/pom.xml:r1605012-1627125
Merged /hive/trunk/hbase-handler/pom.xml:r1605012-1660746
Index: metastore/bin/.gitignore
===================================================================
--- metastore/bin/.gitignore (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ metastore/bin/.gitignore (working copy)
@@ -1 +1 @@
-# Dummy file to make Git recognize this empty directory
+/src/
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -702,6 +702,9 @@
// CBO related
HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."),
+ HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"),
+ EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on"
+ + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."),
// hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
// need to remove by hive .13. Also, do not change default (see SMB operator)
Index: ql/src/test/queries/clientpositive/cbo_join.q
===================================================================
--- ql/src/test/queries/clientpositive/cbo_join.q (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/test/queries/clientpositive/cbo_join.q (working copy)
@@ -4,6 +4,7 @@
set hive.stats.fetch.column.stats=true;
set hive.auto.convert.join=false;
+-- SORT_QUERY_RESULTS
-- 4. Test Select + Join + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key;
select cbo_t1.key from cbo_t1 join cbo_t3;
Index: ql/src/test/results/clientpositive/cbo_join.q.out
===================================================================
--- ql/src/test/results/clientpositive/cbo_join.q.out (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/test/results/clientpositive/cbo_join.q.out (working copy)
@@ -1,4 +1,5 @@
-PREHOOK: query: -- 4. Test Select + Join + TS
+PREHOOK: query: -- SORT_QUERY_RESULTS
+-- 4. Test Select + Join + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -6,7 +7,8 @@
PREHOOK: Input: default@cbo_t2
PREHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-POSTHOOK: query: -- 4. Test Select + Join + TS
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+-- 4. Test Select + Join + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@cbo_t1
@@ -122,46 +124,6 @@
POSTHOOK: Input: default@cbo_t1@dt=2014
POSTHOOK: Input: default@cbo_t3
#### A masked pattern was here ####
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
-NULL
1
1
1
@@ -522,6 +484,46 @@
1
1
1
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
+NULL
PREHOOK: query: select cbo_t1.key from cbo_t1 join cbo_t3 where cbo_t1.key=cbo_t3.key and cbo_t1.key >= 1
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -632,8 +634,6 @@
POSTHOOK: Input: default@cbo_t2
POSTHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-NULL NULL
-NULL NULL
1 1
1 1
1 1
@@ -730,6 +730,8 @@
1 1
1 1
1 1
+NULL NULL
+NULL NULL
PREHOOK: query: select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 right outer join cbo_t2 on cbo_t1.key=cbo_t2.key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -744,8 +746,6 @@
POSTHOOK: Input: default@cbo_t2
POSTHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-NULL NULL
-NULL NULL
1 1
1 1
1 1
@@ -847,6 +847,8 @@
NULL 2
NULL 2
NULL 2
+NULL NULL
+NULL NULL
PREHOOK: query: select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 full outer join cbo_t2 on cbo_t1.key=cbo_t2.key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -861,10 +863,6 @@
POSTHOOK: Input: default@cbo_t2
POSTHOOK: Input: default@cbo_t2@dt=2014
#### A masked pattern was here ####
-NULL NULL
-NULL NULL
-NULL NULL
-NULL NULL
1 1
1 1
1 1
@@ -966,6 +964,10 @@
NULL 2
NULL 2
NULL 2
+NULL NULL
+NULL NULL
+NULL NULL
+NULL NULL
PREHOOK: query: select b, cbo_t1.c, cbo_t2.p, q, cbo_t3.c_int from (select key as a, c_int as b, cbo_t1.c_float as c from cbo_t1) cbo_t1 join (select cbo_t2.key as p, cbo_t2.c_int as q, c_float as r from cbo_t2) cbo_t2 on cbo_t1.a=p join cbo_t3 on cbo_t1.a=key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -5334,8 +5336,6 @@
POSTHOOK: Input: default@cbo_t2@dt=2014
POSTHOOK: Input: default@cbo_t3
#### A masked pattern was here ####
-NULL NULL NULL NULL
-NULL NULL NULL NULL
1 1 1 1
1 1 1 1
1 1 1 1
@@ -5870,6 +5870,8 @@
NULL NULL NULL NULL
NULL NULL NULL NULL
NULL NULL NULL NULL
+NULL NULL NULL NULL
+NULL NULL NULL NULL
PREHOOK: query: select b, cbo_t1.c, cbo_t2.p, q, cbo_t3.c_int from (select key as a, c_int as b, cbo_t1.c_float as c from cbo_t1) cbo_t1 full outer join (select cbo_t2.key as p, cbo_t2.c_int as q, c_float as r from cbo_t2) cbo_t2 on cbo_t1.a=p join cbo_t3 on cbo_t1.a=key
PREHOOK: type: QUERY
PREHOOK: Input: default@cbo_t1
@@ -6430,8 +6432,6 @@
POSTHOOK: Input: default@cbo_t2@dt=2014
POSTHOOK: Input: default@cbo_t3
#### A masked pattern was here ####
-NULL NULL NULL NULL
-NULL NULL NULL NULL
1 1 1 1
1 1 1 1
1 1 1 1
@@ -6966,6 +6966,8 @@
NULL NULL NULL NULL
NULL NULL NULL NULL
NULL NULL NULL NULL
+NULL NULL NULL NULL
+NULL NULL NULL NULL
PREHOOK: query: -- 5. Test Select + Join + FIL + TS
select cbo_t1.c_int, cbo_t2.c_int from cbo_t1 join cbo_t2 on cbo_t1.key=cbo_t2.key where (cbo_t1.c_int + cbo_t2.c_int == 2) and (cbo_t1.c_int > 0 or cbo_t2.c_float >= 0)
PREHOOK: type: QUERY
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy)
@@ -531,14 +531,15 @@
Operator extends OperatorDesc> child = op.getChildOperators().get(0);
- List childCols;
+ List childCols = null;
if (child instanceof CommonJoinOperator) {
- childCols = cppCtx.getJoinPrunedColLists().get(child)
+ childCols = cppCtx.getJoinPrunedColLists().get(child) == null
+ ? null : cppCtx.getJoinPrunedColLists().get(child)
.get((byte) conf.getTag());
} else {
childCols = cppCtx.getPrunedColList(child);
+ }
- }
List valCols = conf.getValueCols();
List valColNames = conf.getOutputValueColumnNames();
@@ -749,6 +750,7 @@
conf.setOutputColumnNames(newOutputColumnNames);
handleChildren(op, cols, cppCtx);
}
+
return null;
}
@@ -971,12 +973,12 @@
.getChildOperators();
LOG.info("JOIN " + op.getIdentifier() + " oldExprs: " + conf.getExprs());
+
List childColLists = cppCtx.genColLists(op);
if (childColLists == null) {
return;
}
-
Map> prunedColLists = new HashMap>();
for (byte tag : conf.getTagOrder()) {
prunedColLists.put(tag, new ArrayList());
@@ -1076,6 +1078,7 @@
}
LOG.info("JOIN " + op.getIdentifier() + " newExprs: " + conf.getExprs());
+
op.setColumnExprMap(newColExprMap);
conf.setOutputColumnNames(outputCols);
op.getSchema().setSignature(rs);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (working copy)
@@ -242,4 +242,4 @@
return null;
}
}
-}
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy)
@@ -142,7 +142,9 @@
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
transformations.add(new ReduceSinkDeDuplication());
}
+ if(!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) {
transformations.add(new NonBlockingOpDeDupProc());
+ }
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEIDENTITYPROJECTREMOVER)) {
transformations.add(new IdentityProjectRemover());
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelDistribution.java (revision 1672174)
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.util.mapping.Mappings.TargetMapping;
+
+public class HiveRelDistribution implements RelDistribution {
+
+ List keys;
+ RelDistribution.Type type;
+
+ public HiveRelDistribution(Type type, List keys) {
+ this.type = type;
+ this.keys = keys;
+ }
+
+ @Override
+ public RelTraitDef> getTraitDef() {
+ return RelDistributionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ if (trait == this) {
+ return true;
+ }
+ switch (((RelDistribution)trait).getType()) {
+ case HASH_DISTRIBUTED :
+ return this.getKeys().equals(((RelDistribution)trait).getKeys());
+ default:
+ throw new RuntimeException("Other distributions are not used yet.");
+ }
+ }
+
+ @Override
+ public RelDistribution apply(TargetMapping mapping) {
+ if (keys.isEmpty()) {
+ return this;
+ }
+ return new HiveRelDistribution(type, keys);
+ }
+
+ @Override
+ public List getKeys() {
+ return keys;
+ }
+
+ @Override
+ public Type getType() {
+ return type;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java (revision 1672174)
@@ -0,0 +1,16 @@
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelCollation extends RelCollationImpl {
+
+ public HiveRelCollation(ImmutableList fieldCollations) {
+ super(fieldCollations);
+ }
+
+}
+
+
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java (revision 1672174)
@@ -0,0 +1,20 @@
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import org.apache.calcite.plan.Context;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+
+public class HiveConfigContext implements Context {
+ private HiveConf config;
+
+ public HiveConfigContext(HiveConf config) {
+ this.config = config;
+ }
+
+ public T unwrap(Class clazz) {
+ if (clazz.isInstance(config)) {
+ return clazz.cast(config);
+ }
+ return null;
+ }
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java (working copy)
@@ -28,8 +28,10 @@
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelOptUtil.InputReferencedVisitor;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
@@ -50,13 +52,18 @@
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter;
import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -319,11 +326,11 @@
return this.mapOfProjIndxInJoinSchemaToLeafPInfo;
}
- public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j) {
+ public static JoinPredicateInfo constructJoinPredicateInfo(Join j) {
return constructJoinPredicateInfo(j, j.getCondition());
}
- public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j, RexNode predicate) {
+ public static JoinPredicateInfo constructJoinPredicateInfo(Join j, RexNode predicate) {
JoinPredicateInfo jpi = null;
JoinLeafPredicateInfo jlpi = null;
List equiLPIList = new ArrayList();
@@ -432,6 +439,16 @@
.copyOf(projsFromRightPartOfJoinKeysInJoinSchema);
}
+ public List getJoinKeyExprs(int input) {
+ if (input == 0) {
+ return this.joinKeyExprsFromLeft;
+ }
+ if (input == 1) {
+ return this.joinKeyExprsFromRight;
+ }
+ return null;
+ }
+
public List getJoinKeyExprsFromLeft() {
return this.joinKeyExprsFromLeft;
}
@@ -461,7 +478,7 @@
return this.projsFromRightPartOfJoinKeysInJoinSchema;
}
- private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(HiveJoin j, RexNode pe) {
+ private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(Join j, RexNode pe) {
JoinLeafPredicateInfo jlpi = null;
List filterNulls = new ArrayList();
List joinKeyExprsFromLeft = new ArrayList();
@@ -561,6 +578,107 @@
return deterministic;
}
+ public static ImmutableMap getColInfoMap(List hiveCols,
+ int startIndx) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = startIndx;
+ for (T ci : hiveCols) {
+ bldr.put(indx, ci);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap shiftVColsMap(Map hiveVCols,
+ int shift) {
+ Builder bldr = ImmutableMap. builder();
+
+ for (Integer pos : hiveVCols.keySet()) {
+ bldr.put(shift + pos, hiveVCols.get(pos));
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap getVColsMap(List hiveVCols,
+ int startIndx) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = startIndx;
+ for (VirtualColumn vc : hiveVCols) {
+ bldr.put(indx, vc);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap getColNameIndxMap(List tableFields) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = 0;
+ for (FieldSchema fs : tableFields) {
+ bldr.put(fs.getName(), indx);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableMap getRowColNameIndxMap(List rowFields) {
+ Builder bldr = ImmutableMap. builder();
+
+ int indx = 0;
+ for (RelDataTypeField rdt : rowFields) {
+ bldr.put(rdt.getName(), indx);
+ indx++;
+ }
+
+ return bldr.build();
+ }
+
+ public static ImmutableList getInputRef(List inputRefs, RelNode inputRel) {
+ ImmutableList.Builder bldr = ImmutableList. builder();
+ for (int i : inputRefs) {
+ bldr.add(new RexInputRef(i, (RelDataType) inputRel.getRowType().getFieldList().get(i).getType()));
+ }
+ return bldr.build();
+ }
+
+ public static ExprNodeDesc getExprNode(Integer inputRefIndx, RelNode inputRel,
+ ExprNodeConverter exprConv) {
+ ExprNodeDesc exprNode = null;
+ RexNode rexInputRef = new RexInputRef(inputRefIndx, (RelDataType) inputRel.getRowType()
+ .getFieldList().get(inputRefIndx).getType());
+ exprNode = rexInputRef.accept(exprConv);
+
+ return exprNode;
+ }
+
+ public static List getExprNodes(List inputRefs, RelNode inputRel,
+ String inputTabAlias) {
+ List exprNodes = new ArrayList();
+ List rexInputRefs = getInputRef(inputRefs, inputRel);
+ // TODO: Change ExprNodeConverter to be independent of Partition Expr
+ ExprNodeConverter exprConv = new ExprNodeConverter(inputTabAlias, inputRel.getRowType(), false, inputRel.getCluster().getTypeFactory());
+ for (RexNode iRef : rexInputRefs) {
+ exprNodes.add(iRef.accept(exprConv));
+ }
+ return exprNodes;
+ }
+
+ public static List getFieldNames(List inputRefs, RelNode inputRel) {
+ List fieldNames = new ArrayList();
+ List schemaNames = inputRel.getRowType().getFieldNames();
+ for (Integer iRef : inputRefs) {
+ fieldNames.add(schemaNames.get(iRef));
+ }
+
+ return fieldNames;
+ }
+
/**
* Walks over an expression and determines whether it is constant.
*/
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java (revision 1672174)
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdDistribution;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelMdDistribution {
+
+ public static final RelMetadataProvider SOURCE =
+ ChainedRelMetadataProvider.of(
+ ImmutableList.of(
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.DISTRIBUTION.method, new HiveRelMdDistribution()),
+ RelMdDistribution.SOURCE));
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdDistribution() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public RelDistribution distribution(HiveAggregate aggregate) {
+ return new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED,
+ aggregate.getGroupSet().asList());
+ }
+
+ public RelDistribution distribution(HiveJoin join) {
+ // Compute distribution
+ ImmutableList.Builder keysListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder leftKeysListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder rightKeysListBuilder =
+ new ImmutableList.Builder();
+ JoinPredicateInfo joinPredInfo =
+ HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join);
+ for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) {
+ JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo.
+ getEquiJoinPredicateElements().get(i);
+ for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) {
+ keysListBuilder.add(leftPos);
+ leftKeysListBuilder.add(leftPos);
+ }
+ for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) {
+ keysListBuilder.add(rightPos);
+ rightKeysListBuilder.add(rightPos);
+ }
+ }
+
+ RelDistribution distribution;
+ switch (join.getJoinAlgorithm()) {
+ case SMB_JOIN:
+ case BUCKET_JOIN:
+ case COMMON_JOIN:
+ distribution = new HiveRelDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED, keysListBuilder.build());
+ break;
+ case MAP_JOIN:
+ // Keep buckets from the streaming relation
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ distribution = new HiveRelDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED, leftKeysListBuilder.build());
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ distribution = new HiveRelDistribution(
+ RelDistribution.Type.HASH_DISTRIBUTED, rightKeysListBuilder.build());
+ } else {
+ distribution = null;
+ }
+ break;
+ default:
+ distribution = null;
+ }
+ return distribution;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java (revision 1672174)
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelMdCollation {
+
+ public static final RelMetadataProvider SOURCE =
+ ChainedRelMetadataProvider.of(
+ ImmutableList.of(
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.COLLATIONS.method, new HiveRelMdCollation()),
+ RelMdCollation.SOURCE));
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdCollation() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public ImmutableList collations(HiveAggregate aggregate) {
+ // Compute collations
+ ImmutableList.Builder collationListBuilder =
+ new ImmutableList.Builder();
+ for (int pos : aggregate.getGroupSet().asList()) {
+ final RelFieldCollation fieldCollation = new RelFieldCollation(pos);
+ collationListBuilder.add(fieldCollation);
+ }
+ // Return aggregate collations
+ return ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(collationListBuilder.build())));
+ }
+
+ public ImmutableList collations(HiveJoin join) {
+ // Compute collations
+ ImmutableList.Builder collationListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder leftCollationListBuilder =
+ new ImmutableList.Builder();
+ ImmutableList.Builder rightCollationListBuilder =
+ new ImmutableList.Builder();
+ JoinPredicateInfo joinPredInfo =
+ HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join);
+ for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) {
+ JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo.
+ getEquiJoinPredicateElements().get(i);
+ for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) {
+ final RelFieldCollation leftFieldCollation = new RelFieldCollation(leftPos);
+ collationListBuilder.add(leftFieldCollation);
+ leftCollationListBuilder.add(leftFieldCollation);
+ }
+ for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) {
+ final RelFieldCollation rightFieldCollation = new RelFieldCollation(rightPos);
+ collationListBuilder.add(rightFieldCollation);
+ rightCollationListBuilder.add(rightFieldCollation);
+ }
+ }
+
+ // Return join collations
+ final ImmutableList collation;
+ switch (join.getJoinAlgorithm()) {
+ case SMB_JOIN:
+ case COMMON_JOIN:
+ collation = ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(collationListBuilder.build())));
+ break;
+ case BUCKET_JOIN:
+ case MAP_JOIN:
+ // Keep order from the streaming relation
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ collation = ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(leftCollationListBuilder.build())));
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ collation = ImmutableList.of(
+ RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(rightCollationListBuilder.build())));
+ } else {
+ collation = null;
+ }
+ break;
+ default:
+ collation = null;
+ }
+ return collation;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java (revision 1672174)
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdParallelism;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+
+public class HiveRelMdParallelism extends RelMdParallelism {
+
+ private final Double maxSplitSize;
+
+ //~ Constructors -----------------------------------------------------------
+
+ public HiveRelMdParallelism(Double maxSplitSize) {
+ this.maxSplitSize = maxSplitSize;
+ }
+
+ public RelMetadataProvider getMetadataProvider() {
+ return ReflectiveRelMetadataProvider.reflectiveSource(this,
+ BuiltInMethod.IS_PHASE_TRANSITION.method,
+ BuiltInMethod.SPLIT_COUNT.method);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ public Boolean isPhaseTransition(HiveJoin join) {
+ // As Exchange operator is introduced later on, we make a
+ // common join operator create a new stage for the moment
+ if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) {
+ return true;
+ }
+ return false;
+ }
+
+ public Boolean isPhaseTransition(HiveSort sort) {
+ // As Exchange operator is introduced later on, we make a
+ // sort operator create a new stage for the moment
+ return true;
+ }
+
+ public Integer splitCount(HiveJoin join) {
+ if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) {
+ return splitCountRepartition(join);
+ }
+ else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.SMB_JOIN) {
+ RelNode largeInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ largeInput = join.getLeft();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ largeInput = join.getRight();
+ } else {
+ return null;
+ }
+ return splitCount(largeInput);
+ }
+ return null;
+ }
+
+ public Integer splitCount(HiveTableScan scan) {
+ RelOptHiveTable table = (RelOptHiveTable) scan.getTable();
+ return table.getHiveTableMD().getNumBuckets();
+ }
+
+ public Integer splitCount(RelNode rel) {
+ Boolean newPhase = RelMetadataQuery.isPhaseTransition(rel);
+
+ if (newPhase == null) {
+ return null;
+ }
+
+ if (newPhase) {
+ // We repartition: new number of splits
+ return splitCountRepartition(rel);
+ }
+
+ // We do not repartition: take number of splits from children
+ Integer splitCount = 0;
+ for (RelNode input : rel.getInputs()) {
+ splitCount += RelMetadataQuery.splitCount(input);
+ }
+ return splitCount;
+ }
+
+ public Integer splitCountRepartition(RelNode rel) {
+ // We repartition: new number of splits
+ final Double averageRowSize = RelMetadataQuery.getAverageRowSize(rel);
+ final Double rowCount = RelMetadataQuery.getRowCount(rel);
+ if (averageRowSize == null || rowCount == null) {
+ return null;
+ }
+ final Double totalSize = averageRowSize * rowCount;
+ final Double splitCount = totalSize / maxSplitSize;
+ return splitCount.intValue();
+ }
+
+}
+
+// End RelMdParallelism.java
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java (revision 1672174)
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdMemory;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveLimit;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+
+public class HiveRelMdMemory extends RelMdMemory {
+
+ private static final HiveRelMdMemory INSTANCE = new HiveRelMdMemory();
+
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(INSTANCE,
+ BuiltInMethod.MEMORY.method,
+ BuiltInMethod.CUMULATIVE_MEMORY_WITHIN_PHASE.method,
+ BuiltInMethod.CUMULATIVE_MEMORY_WITHIN_PHASE_SPLIT.method);
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdMemory() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public Double memory(HiveTableScan tableScan) {
+ return 0.0d;
+ }
+
+ public Double memory(HiveAggregate aggregate) {
+ final Double avgRowSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput());
+ final Double rowCount = RelMetadataQuery.getRowCount(aggregate.getInput());
+ if (avgRowSize == null || rowCount == null) {
+ return null;
+ }
+ return avgRowSize * rowCount;
+ }
+
+ public Double memory(HiveFilter filter) {
+ return 0.0;
+ }
+
+ public Double memory(HiveJoin join) {
+ Double memory = 0.0;
+ if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) {
+ // Left side
+ final Double leftAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double leftRowCount = RelMetadataQuery.getRowCount(join.getLeft());
+ if (leftAvgRowSize == null || leftRowCount == null) {
+ return null;
+ }
+ memory += leftAvgRowSize * leftRowCount;
+ // Right side
+ final Double rightAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ final Double rightRowCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (rightAvgRowSize == null || rightRowCount == null) {
+ return null;
+ }
+ memory += rightAvgRowSize * rightRowCount;
+ } else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) {
+ RelNode inMemoryInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ inMemoryInput = join.getRight();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ inMemoryInput = join.getLeft();
+ } else {
+ return null;
+ }
+ // Result
+ final Double avgRowSize = RelMetadataQuery.getAverageRowSize(inMemoryInput);
+ final Double rowCount = RelMetadataQuery.getRowCount(inMemoryInput);
+ if (avgRowSize == null || rowCount == null) {
+ return null;
+ }
+ memory = avgRowSize * rowCount;
+ }
+ return memory;
+ }
+
+ public Double cumulativeMemoryWithinPhaseSplit(HiveJoin join) {
+ if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN ||
+ join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) {
+ // Check streaming side
+ RelNode inMemoryInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ inMemoryInput = join.getRight();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ inMemoryInput = join.getLeft();
+ } else {
+ return null;
+ }
+
+ if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN) {
+ // If simple map join, the whole relation goes in memory
+ return RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput);
+ }
+ else if (join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) {
+ // If bucket map join, only a split goes in memory
+ final Double memoryInput =
+ RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput);
+ final Integer splitCount = RelMetadataQuery.splitCount(inMemoryInput);
+ if (memoryInput == null || splitCount == null) {
+ return null;
+ }
+ return memoryInput / splitCount;
+ }
+ }
+ // Else, we fall back to default
+ return super.cumulativeMemoryWithinPhaseSplit(join);
+ }
+
+ public Double memory(HiveLimit limit) {
+ return 0.0;
+ }
+
+ public Double memory(HiveProject project) {
+ return 0.0;
+ }
+
+ public Double memory(HiveSort sort) {
+ if (sort.getCollation() != RelCollations.EMPTY) {
+ // It sorts
+ final Double avgRowSize = RelMetadataQuery.getAverageRowSize(sort.getInput());
+ final Double rowCount = RelMetadataQuery.getRowCount(sort.getInput());
+ if (avgRowSize == null || rowCount == null) {
+ return null;
+ }
+ return avgRowSize * rowCount;
+ }
+ // It does not sort, memory footprint is zero
+ return 0.0;
+ }
+
+ public Double memory(HiveUnion union) {
+ return 0.0;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java (revision 1672174)
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdSize;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+
+import com.google.common.collect.ImmutableList;
+
+public class HiveRelMdSize extends RelMdSize {
+
+ private static final HiveRelMdSize INSTANCE = new HiveRelMdSize();
+
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(INSTANCE,
+ BuiltInMethod.AVERAGE_COLUMN_SIZES.method,
+ BuiltInMethod.AVERAGE_ROW_SIZE.method);
+
+ //~ Constructors -----------------------------------------------------------
+
+ private HiveRelMdSize() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public List averageColumnSizes(HiveTableScan scan) {
+ List neededcolsLst = scan.getNeededColIndxsFrmReloptHT();
+ Set needColsSet = new HashSet(neededcolsLst);
+ List columnStatistics = ((RelOptHiveTable) scan.getTable())
+ .getColStat(neededcolsLst);
+
+ // Obtain list of col stats, or use default if they are not available
+ final ImmutableList.Builder list = ImmutableList.builder();
+ int indxRqdCol = 0;
+ int nFields = scan.getRowType().getFieldCount();
+ for (int i = 0; i < nFields; i++) {
+ if (needColsSet.contains(i)) {
+ ColStatistics columnStatistic = columnStatistics.get(indxRqdCol);
+ indxRqdCol++;
+ if (columnStatistic == null) {
+ RelDataTypeField field = scan.getPrunedRowType().getFieldList().get(i);
+ list.add(averageTypeValueSize(field.getType()));
+ } else {
+ list.add(columnStatistic.getAvgColLen());
+ }
+ } else {
+ list.add(new Double(0));
+ }
+ }
+
+ return list.build();
+ }
+
+ public List averageColumnSizes(HiveJoin rel) {
+ final RelNode left = rel.getLeft();
+ final RelNode right = rel.getRight();
+ final List lefts =
+ RelMetadataQuery.getAverageColumnSizes(left);
+ List rights = null;
+ if (!rel.isLeftSemiJoin()) {
+ rights = RelMetadataQuery.getAverageColumnSizes(right);
+ }
+ if (lefts == null && rights == null) {
+ return null;
+ }
+ final int fieldCount = rel.getRowType().getFieldCount();
+ Double[] sizes = new Double[fieldCount];
+ if (lefts != null) {
+ lefts.toArray(sizes);
+ }
+ if (rights != null) {
+ final int leftCount = left.getRowType().getFieldCount();
+ for (int i = 0; i < rights.size(); i++) {
+ sizes[leftCount + i] = rights.get(i);
+ }
+ }
+ return ImmutableNullableList.copyOf(sizes);
+ }
+
+ // TODO: remove when averageTypeValueSize method RelMdSize
+ // supports all types
+ public Double averageTypeValueSize(RelDataType type) {
+ switch (type.getSqlTypeName()) {
+ case BOOLEAN:
+ case TINYINT:
+ return 1d;
+ case SMALLINT:
+ return 2d;
+ case INTEGER:
+ case FLOAT:
+ case REAL:
+ case DECIMAL:
+ case DATE:
+ case TIME:
+ return 4d;
+ case BIGINT:
+ case DOUBLE:
+ case TIMESTAMP:
+ case INTERVAL_DAY_TIME:
+ case INTERVAL_YEAR_MONTH:
+ return 8d;
+ case BINARY:
+ return (double) type.getPrecision();
+ case VARBINARY:
+ return Math.min((double) type.getPrecision(), 100d);
+ case CHAR:
+ return (double) type.getPrecision() * BYTES_PER_CHARACTER;
+ case VARCHAR:
+ // Even in large (say VARCHAR(2000)) columns most strings are small
+ return Math.min((double) type.getPrecision() * BYTES_PER_CHARACTER, 100d);
+ case ROW:
+ Double average = 0.0;
+ for (RelDataTypeField field : type.getFieldList()) {
+ average += averageTypeValueSize(field.getType());
+ }
+ return average;
+ default:
+ return null;
+ }
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
import java.util.ArrayList;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java (working copy)
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.optimizer.calcite.stats;
import java.util.BitSet;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java (working copy)
@@ -20,21 +20,64 @@
import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveDefaultCostModel;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveRelMdCost;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveOnTezCostModel;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistribution;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdMemory;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdParallelism;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdRowCount;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdSelectivity;
+import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdSize;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdUniqueKeys;
import com.google.common.collect.ImmutableList;
public class HiveDefaultRelMetadataProvider {
- private HiveDefaultRelMetadataProvider() {
+
+ private final HiveConf hiveConf;
+
+
+ public HiveDefaultRelMetadataProvider(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
}
- public static final RelMetadataProvider INSTANCE = ChainedRelMetadataProvider.of(ImmutableList
- .of(HiveRelMdDistinctRowCount.SOURCE,
+ public RelMetadataProvider getMetadataProvider() {
+
+ // Create cost metadata provider
+ final HiveCostModel cm;
+ if (HiveConf.getVar(this.hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.EXTENDED_COST_MODEL)) {
+ final Double maxMemory = (double) HiveConf.getLongVar(
+ this.hiveConf,
+ HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+ cm = new HiveOnTezCostModel(maxMemory);
+ } else {
+ cm = new HiveDefaultCostModel();
+ }
+
+ // Get max split size for HiveRelMdParallelism
+ final Double maxSplitSize = (double) HiveConf.getLongVar(
+ this.hiveConf,
+ HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
+
+ // Return MD provider
+ return ChainedRelMetadataProvider.of(ImmutableList
+ .of(new HiveRelMdCost(cm).getMetadataProvider(),
+ HiveRelMdDistinctRowCount.SOURCE,
HiveRelMdSelectivity.SOURCE,
HiveRelMdRowCount.SOURCE,
HiveRelMdUniqueKeys.SOURCE,
+ HiveRelMdSize.SOURCE,
+ HiveRelMdMemory.SOURCE,
+ new HiveRelMdParallelism(maxSplitSize).getMetadataProvider(),
+ HiveRelMdDistribution.SOURCE,
+ HiveRelMdCollation.SOURCE,
new DefaultRelMetadataProvider()));
}
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java (revision 1672174)
@@ -0,0 +1,49 @@
+package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SortExchange;
+
+public class HiveSortExchange extends SortExchange {
+
+ private HiveSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, RelDistribution distribution, RelCollation collation) {
+ super(cluster, traitSet, input, distribution, collation);
+ }
+
+ public HiveSortExchange(RelInput input) {
+ super(input);
+ }
+
+ /**
+ * Creates a HiveSortExchange.
+ *
+ * @param input Input relational expression
+ * @param distribution Distribution specification
+ * @param collation Collation specification
+ */
+ public static HiveSortExchange create(RelNode input,
+ RelDistribution distribution, RelCollation collation) {
+ RelOptCluster cluster = input.getCluster();
+ distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
+ RelTraitSet traitSet =
+ input.getTraitSet().replace(Convention.NONE).replace(distribution);
+ collation = RelCollationTraitDef.INSTANCE.canonize(collation);
+ return new HiveSortExchange(cluster, traitSet, input, distribution, collation);
+ }
+
+ @Override
+ public SortExchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution,
+ RelCollation newCollation) {
+ return new HiveSortExchange(getCluster(), traitSet, newInput,
+ newDistribution, newCollation);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java (working copy)
@@ -17,21 +17,34 @@
*/
package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+
/**
* Relational expression representing a scan of a HiveDB collection.
*
@@ -42,6 +55,9 @@
*/
public class HiveTableScan extends TableScan implements HiveRelNode {
+ private final RelDataType hiveTableScanRowType;
+ private final ImmutableList neededColIndxsFrmReloptHT;
+
/**
* Creates a HiveTableScan.
*
@@ -54,10 +70,16 @@
* @param table
* HiveDB table
*/
- public HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table,
- RelDataType rowtype) {
+ public HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table) {
+ this(cluster, traitSet, table, table.getRowType());
+ }
+
+ private HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table,
+ RelDataType newRowtype) {
super(cluster, TraitsUtil.getDefaultTraitSet(cluster), table);
assert getConvention() == HiveRelNode.CONVENTION;
+ this.hiveTableScanRowType = newRowtype;
+ this.neededColIndxsFrmReloptHT = buildNeededColIndxsFrmReloptHT(table.getRowType(), newRowtype);
}
@Override
@@ -66,9 +88,21 @@
return this;
}
+ /**
+ * Copy TableScan operator with a new Row Schema. The new Row Schema can only
+ * be a subset of this TS schema.
+ *
+ * @param newRowtype
+ * @return
+ */
+ public HiveTableScan copy(RelDataType newRowtype) {
+ return new HiveTableScan(getCluster(), getTraitSet(), ((RelOptHiveTable) table),
+ newRowtype);
+ }
+
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
@Override
@@ -89,4 +123,62 @@
public List getColStat(List projIndxLst) {
return ((RelOptHiveTable) table).getColStat(projIndxLst);
}
-}
\ No newline at end of file
+
+ @Override
+ public RelNode project(ImmutableBitSet fieldsUsed, Set extraFields,
+ RelFactories.ProjectFactory projectFactory) {
+
+ // 1. If the schema is the same then bail out
+ final int fieldCount = getRowType().getFieldCount();
+ if (fieldsUsed.equals(ImmutableBitSet.range(fieldCount)) && extraFields.isEmpty()) {
+ return this;
+ }
+
+ // 2. Make sure there is no dynamic addition of virtual cols
+ if (extraFields != null && !extraFields.isEmpty()) {
+ throw new RuntimeException("Hive TS does not support adding virtual columns dynamically");
+ }
+
+ // 3. Create new TS schema that is a subset of original
+ final List fields = getRowType().getFieldList();
+ List fieldTypes = new LinkedList();
+ List fieldNames = new LinkedList();
+ List exprList = new ArrayList();
+ RexBuilder rexBuilder = getCluster().getRexBuilder();
+ for (int i : fieldsUsed) {
+ RelDataTypeField field = fields.get(i);
+ fieldTypes.add(field.getType());
+ fieldNames.add(field.getName());
+ exprList.add(rexBuilder.makeInputRef(this, i));
+ }
+
+ // 4. Build new TS
+ HiveTableScan newHT = copy(getCluster().getTypeFactory().createStructType(fieldTypes,
+ fieldNames));
+
+ // 5. Add Proj on top of TS
+ return projectFactory.createProject(newHT, exprList, new ArrayList(fieldNames));
+ }
+
+ public List getNeededColIndxsFrmReloptHT() {
+ return neededColIndxsFrmReloptHT;
+ }
+
+ public RelDataType getPrunedRowType() {
+ return hiveTableScanRowType;
+ }
+
+ private static ImmutableList buildNeededColIndxsFrmReloptHT(RelDataType htRowtype,
+ RelDataType scanRowType) {
+ Builder neededColIndxsFrmReloptHTBldr = new ImmutableList.Builder();
+ Map colNameToPosInReloptHT = HiveCalciteUtil.getRowColNameIndxMap(htRowtype
+ .getFieldList());
+ List colNamesInScanRowType = scanRowType.getFieldNames();
+
+ for (int i = 0; i < colNamesInScanRowType.size(); i++) {
+ neededColIndxsFrmReloptHTBldr.add(colNameToPosInReloptHT.get(colNamesInScanRowType.get(i)));
+ }
+
+ return neededColIndxsFrmReloptHTBldr.build();
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java (working copy)
@@ -25,9 +25,9 @@
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
public class HiveLimit extends SingleRel implements HiveRelNode {
private final RexNode offset;
@@ -52,6 +52,6 @@
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java (working copy)
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
@@ -25,7 +27,9 @@
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.RelFactories.JoinFactory;
@@ -33,19 +37,15 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
-import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
+import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm;
//TODO: Should we convert MultiJoin to be a child of HiveJoin
public class HiveJoin extends Join implements HiveRelNode {
- // NOTE: COMMON_JOIN & SMB_JOIN are Sort Merge Join (in case of COMMON_JOIN
- // each parallel computation handles multiple splits where as in case of SMB
- // each parallel computation handles one bucket). MAP_JOIN and BUCKET_JOIN is
- // hash joins where MAP_JOIN keeps the whole data set of non streaming tables
- // in memory where as BUCKET_JOIN keeps only the b
- public enum JoinAlgorithm {
- NONE, COMMON_JOIN, MAP_JOIN, BUCKET_JOIN, SMB_JOIN
- }
public enum MapJoinStreamingRelation {
NONE, LEFT_RELATION, RIGHT_RELATION
@@ -54,17 +54,20 @@
public static final JoinFactory HIVE_JOIN_FACTORY = new HiveJoinFactoryImpl();
private final boolean leftSemiJoin;
- private final JoinAlgorithm joinAlgorithm;
- //This will be used once we do Join Algorithm selection
- @SuppressWarnings("unused")
- private final MapJoinStreamingRelation mapJoinStreamingSide = MapJoinStreamingRelation.NONE;
+ private JoinAlgorithm joinAlgorithm;
+ private MapJoinStreamingRelation mapJoinStreamingSide;
+ private RelOptCost joinCost;
+ // Whether inputs are already sorted
+ private ImmutableBitSet sortedInputs;
public static HiveJoin getJoin(RelOptCluster cluster, RelNode left, RelNode right,
RexNode condition, JoinRelType joinType, boolean leftSemiJoin) {
try {
Set variablesStopped = Collections.emptySet();
- return new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped,
- JoinAlgorithm.NONE, null, leftSemiJoin);
+ HiveJoin join = new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped,
+ JoinAlgorithm.NONE, chooseStreamingSide(left,right), null, leftSemiJoin);
+ join.sortedInputs = checkInputsCorrectOrder(join);
+ return join;
} catch (InvalidRelException e) {
throw new RuntimeException(e);
}
@@ -72,11 +75,13 @@
protected HiveJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
RexNode condition, JoinRelType joinType, Set variablesStopped,
- JoinAlgorithm joinAlgo, MapJoinStreamingRelation streamingSideForMapJoin, boolean leftSemiJoin)
- throws InvalidRelException {
+ JoinAlgorithm joinAlgo, MapJoinStreamingRelation streamingSideForMapJoin,
+ ImmutableBitSet sortedInputs, boolean leftSemiJoin) throws InvalidRelException {
super(cluster, TraitsUtil.getDefaultTraitSet(cluster), left, right, condition, joinType,
variablesStopped);
this.joinAlgorithm = joinAlgo;
+ this.mapJoinStreamingSide = streamingSideForMapJoin;
+ this.sortedInputs = sortedInputs;
this.leftSemiJoin = leftSemiJoin;
}
@@ -90,7 +95,7 @@
try {
Set variablesStopped = Collections.emptySet();
return new HiveJoin(getCluster(), traitSet, left, right, conditionExpr, joinType,
- variablesStopped, JoinAlgorithm.NONE, null, leftSemiJoin);
+ variablesStopped, joinAlgorithm, mapJoinStreamingSide, sortedInputs, leftSemiJoin);
} catch (InvalidRelException e) {
// Semantic error not possible. Must be a bug. Convert to
// internal error.
@@ -102,6 +107,22 @@
return joinAlgorithm;
}
+ public void setJoinAlgorithm(JoinAlgorithm joinAlgorithm) {
+ this.joinAlgorithm = joinAlgorithm;
+ }
+
+ public MapJoinStreamingRelation getMapJoinStreamingSide() {
+ return mapJoinStreamingSide;
+ }
+
+ public void setJoinCost(RelOptCost joinCost) {
+ this.joinCost = joinCost;
+ }
+
+ public ImmutableBitSet getSortedInputs() {
+ return sortedInputs;
+ }
+
public boolean isLeftSemiJoin() {
return leftSemiJoin;
}
@@ -111,11 +132,57 @@
*/
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- double leftRCount = RelMetadataQuery.getRowCount(getLeft());
- double rightRCount = RelMetadataQuery.getRowCount(getRight());
- return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0);
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
+ private static MapJoinStreamingRelation chooseStreamingSide(RelNode left,
+ RelNode right) {
+ Double leftInputSize = RelMetadataQuery.memory(left);
+ Double rightInputSize = RelMetadataQuery.memory(right);
+ if (leftInputSize == null && rightInputSize == null) {
+ return MapJoinStreamingRelation.NONE;
+ } else if (leftInputSize != null &&
+ (rightInputSize == null ||
+ (leftInputSize < rightInputSize))) {
+ return MapJoinStreamingRelation.RIGHT_RELATION;
+ } else if (rightInputSize != null &&
+ (leftInputSize == null ||
+ (rightInputSize <= leftInputSize))) {
+ return MapJoinStreamingRelation.LEFT_RELATION;
+ }
+ return MapJoinStreamingRelation.NONE;
+ }
+
+ private static ImmutableBitSet checkInputsCorrectOrder(HiveJoin join) {
+ ImmutableBitSet.Builder sortedInputs = new ImmutableBitSet.Builder();
+ JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.
+ constructJoinPredicateInfo(join);
+ List joinKeysInChildren = new ArrayList();
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()));
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()));
+
+ for (int i=0; i groupSets,
List aggCalls) throws InvalidRelException {
super(cluster, TraitsUtil.getDefaultTraitSet(cluster), child, indicator, groupSet,
groupSets, aggCalls);
+ this.bucketedInput = checkInputCorrectBucketing(child, groupSet);
}
@Override
@@ -66,15 +69,28 @@
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
+ private static boolean checkInputCorrectBucketing(RelNode child, ImmutableBitSet groupSet) {
+ return false;
+ //TODO: Enable this again
+ /*
+ return RelMetadataQuery.distribution(child).getKeys().
+ containsAll(groupSet.asList());
+ */
+ }
+
@Override
public double getRows() {
return RelMetadataQuery.getDistinctRowCount(this, groupSet, getCluster().getRexBuilder()
.makeLiteral(true));
}
+ public boolean isBucketedInput() {
+ return this.bucketedInput;
+ }
+
private static class HiveAggRelFactory implements AggregateFactory {
@Override
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java (working copy)
@@ -29,6 +29,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
@@ -42,7 +43,6 @@
import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost;
-
import com.google.common.collect.ImmutableList;
public class HiveProject extends Project implements HiveRelNode {
@@ -172,7 +172,7 @@
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return HiveCost.FACTORY.makeZeroCost();
+ return RelMetadataQuery.getNonCumulativeCost(this);
}
@Override
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java (revision 1672174)
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * HiveRelMdCost supplies the implementation of cost model.
+ */
+public class HiveRelMdCost {
+
+ private final HiveCostModel hiveCostModel;
+
+ public HiveRelMdCost(HiveCostModel hiveCostModel) {
+ this.hiveCostModel = hiveCostModel;
+ }
+
+ public RelMetadataProvider getMetadataProvider() {
+ return ChainedRelMetadataProvider.of(
+ ImmutableList.of(
+ ReflectiveRelMetadataProvider.reflectiveSource(this,
+ BuiltInMethod.NON_CUMULATIVE_COST.method),
+ RelMdPercentageOriginalRows.SOURCE));
+ }
+
+ public RelOptCost getNonCumulativeCost(HiveAggregate aggregate) {
+ return hiveCostModel.getAggregateCost(aggregate);
+ }
+
+ public RelOptCost getNonCumulativeCost(HiveJoin join) {
+ return hiveCostModel.getJoinCost(join);
+ }
+
+ // Default case
+ public RelOptCost getNonCumulativeCost(RelNode rel) {
+ return hiveCostModel.getDefaultCost();
+ }
+
+}
+
+// End HiveRelMdCost.java
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java (revision 1672174)
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import java.util.EnumSet;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+
+/**
+ * Default implementation of the cost model.
+ * Currently used by MR and Spark execution engines.
+ */
+public class HiveDefaultCostModel extends HiveCostModel {
+
+ @Override
+ public RelOptCost getDefaultCost() {
+ return HiveCost.FACTORY.makeZeroCost();
+ }
+
+ @Override
+ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
+ return HiveCost.FACTORY.makeZeroCost();
+ }
+
+ @Override
+ protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) {
+ return EnumSet.of(JoinAlgorithm.NONE);
+ }
+
+ @Override
+ protected RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm) {
+ RelOptCost algorithmCost;
+ switch (algorithm) {
+ case NONE:
+ algorithmCost = computeJoinCardinalityCost(join);
+ break;
+ default:
+ algorithmCost = null;
+ }
+ return algorithmCost;
+ }
+
+ private static RelOptCost computeJoinCardinalityCost(HiveJoin join) {
+ double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java (revision 1672174)
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import java.util.EnumSet;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+
+/**
+ * Cost model interface.
+ */
+public abstract class HiveCostModel {
+
+ private static final Log LOG = LogFactory.getLog(HiveCostModel.class);
+
+ // NOTE: COMMON_JOIN & SMB_JOIN are Sort Merge Join (in case of COMMON_JOIN
+ // each parallel computation handles multiple splits where as in case of SMB
+ // each parallel computation handles one bucket). MAP_JOIN and BUCKET_JOIN is
+ // hash joins where MAP_JOIN keeps the whole data set of non streaming tables
+ // in memory where as BUCKET_JOIN keeps only the b
+ public enum JoinAlgorithm {
+ NONE, COMMON_JOIN, MAP_JOIN, BUCKET_JOIN, SMB_JOIN
+ }
+
+ public abstract RelOptCost getDefaultCost();
+
+ public abstract RelOptCost getAggregateCost(HiveAggregate aggregate);
+
+ public RelOptCost getJoinCost(HiveJoin join) {
+ // Retrieve algorithms
+ EnumSet possibleAlgorithms = getExecutableJoinAlgorithms(join);
+
+ // Select algorithm with min cost
+ JoinAlgorithm joinAlgorithm = null;
+ RelOptCost minJoinCost = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Join algorithm selection for:\n" + RelOptUtil.toString(join));
+ }
+ for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) {
+ RelOptCost joinCost = getJoinCost(join, possibleAlgorithm);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(possibleAlgorithm + " cost: " + joinCost);
+ }
+ if (minJoinCost == null || joinCost.isLt(minJoinCost) ) {
+ joinAlgorithm = possibleAlgorithm;
+ minJoinCost = joinCost;
+ }
+ }
+ join.setJoinAlgorithm(joinAlgorithm);
+ join.setJoinCost(minJoinCost);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(joinAlgorithm + " selected");
+ }
+
+ return minJoinCost;
+ }
+
+ /**
+ * Returns the possible algorithms for a given join operator.
+ *
+ * @param join the join operator
+ * @return a set containing all the possible join algorithms that can be
+ * executed for this join operator
+ */
+ abstract EnumSet getExecutableJoinAlgorithms(HiveJoin join);
+
+ /**
+ * Returns the cost for a given algorithm and execution engine.
+ *
+ * @param join the join operator
+ * @param algorithm the join algorithm
+ * @return the cost for the given algorithm, or null if the algorithm is not
+ * defined for this execution engine
+ */
+ abstract RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm);
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java (revision 1672174)
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Cost model for Tez execution engine.
+ */
+public class HiveOnTezCostModel extends HiveCostModel {
+
+ private final Double maxMemory;
+
+
+ public HiveOnTezCostModel(Double maxMemory) {
+ this.maxMemory = maxMemory;
+ }
+
+ @Override
+ public RelOptCost getDefaultCost() {
+ return HiveCost.FACTORY.makeZeroCost();
+ }
+
+ @Override
+ public RelOptCost getAggregateCost(HiveAggregate aggregate) {
+ if (aggregate.isBucketedInput()) {
+ return HiveCost.FACTORY.makeZeroCost();
+ } else {
+ // 1. Sum of input cardinalities
+ final Double rCount = RelMetadataQuery.getRowCount(aggregate.getInput());
+ if (rCount == null) {
+ return null;
+ }
+ // 2. CPU cost = sorting cost
+ final double cpuCost = HiveCostUtil.computeSortCPUCost(rCount);
+ // 3. IO cost = cost of writing intermediary results to local FS +
+ // cost of reading from local FS for transferring to GBy +
+ // cost of transferring map outputs to GBy operator
+ final Double rAverageSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput());
+ if (rAverageSize == null) {
+ return null;
+ }
+ final double ioCost = HiveCostUtil.computeSortIOCost(new Pair(rCount,rAverageSize));
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+ }
+
+ @Override
+ protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) {
+ Set possibleAlgorithms = new HashSet();
+
+ // Check streaming side
+ RelNode smallInput;
+ if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) {
+ smallInput = join.getRight();
+ } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) {
+ smallInput = join.getLeft();
+ } else {
+ smallInput = null;
+ }
+
+ if (smallInput != null) {
+ // Requirements:
+ // - For SMB, sorted by their keys on both sides and bucketed.
+ // - For Bucket, bucketed by their keys on both sides. / Fitting in memory
+ // - For Map, no additional requirement. / Fitting in memory
+
+ // Get key columns
+ JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.
+ constructJoinPredicateInfo(join);
+ List joinKeysInChildren = new ArrayList();
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()));
+ joinKeysInChildren.add(
+ ImmutableIntList.copyOf(
+ joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()));
+
+ // Obtain number of buckets
+ Integer buckets = RelMetadataQuery.splitCount(smallInput);
+ // Obtain map algorithms for which smallest input fits in memory
+ boolean bucketFitsMemory = false;
+ boolean inputFitsMemory = false;
+ if (buckets != null) {
+ bucketFitsMemory = isFittingIntoMemory(maxMemory, smallInput, buckets);
+ }
+ inputFitsMemory = bucketFitsMemory ?
+ isFittingIntoMemory(maxMemory, smallInput, 1) : false;
+ boolean orderedBucketed = true;
+ boolean bucketed = true;
+ for (int i=0; i maxSize) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm) {
+ RelOptCost algorithmCost;
+ switch (algorithm) {
+ case COMMON_JOIN:
+ algorithmCost = computeCostCommonJoin(join);
+ break;
+ case MAP_JOIN:
+ algorithmCost = computeCostMapJoin(join);
+ break;
+ case BUCKET_JOIN:
+ algorithmCost = computeCostBucketJoin(join);
+ break;
+ case SMB_JOIN:
+ algorithmCost = computeCostSMBJoin(join);
+ break;
+ default:
+ algorithmCost = null;
+ }
+ return algorithmCost;
+ }
+
+ private static RelOptCost computeCostCommonJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = sorting cost (for each relation) +
+ // total merge cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ final double cpuCost = HiveCostUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs());
+ // 3. IO cost = cost of writing intermediary results to local FS +
+ // cost of reading from local FS for transferring to join +
+ // cost of transferring map outputs to Join operator
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final double ioCost = HiveCostUtil.computeSortMergeIOCost(relationInfos);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+ private static RelOptCost computeCostMapJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = HashTable construction cost +
+ // join cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder();
+ switch (join.getMapJoinStreamingSide()) {
+ case LEFT_RELATION:
+ streamingBuilder.set(0);
+ break;
+ case RIGHT_RELATION:
+ streamingBuilder.set(1);
+ break;
+ default:
+ return null;
+ }
+ ImmutableBitSet streaming = streamingBuilder.build();
+ final double cpuCost = HiveCostUtil.computeMapJoinCPUCost(cardinalities, streaming);
+ // 3. IO cost = cost of transferring small tables to join node *
+ // degree of parallelism
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final int parallelism = RelMetadataQuery.splitCount(join) == null
+ ? 1 : RelMetadataQuery.splitCount(join);
+ final double ioCost = HiveCostUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+ private static RelOptCost computeCostBucketJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = HashTable construction cost +
+ // join cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder();
+ switch (join.getMapJoinStreamingSide()) {
+ case LEFT_RELATION:
+ streamingBuilder.set(0);
+ break;
+ case RIGHT_RELATION:
+ streamingBuilder.set(1);
+ break;
+ default:
+ return null;
+ }
+ ImmutableBitSet streaming = streamingBuilder.build();
+ final double cpuCost = HiveCostUtil.computeBucketMapJoinCPUCost(cardinalities, streaming);
+ // 3. IO cost = cost of transferring small tables to join node *
+ // degree of parallelism
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final int parallelism = RelMetadataQuery.splitCount(join) == null
+ ? 1 : RelMetadataQuery.splitCount(join);
+ final double ioCost = HiveCostUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+ private static RelOptCost computeCostSMBJoin(HiveJoin join) {
+ // 1. Sum of input cardinalities
+ final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft());
+ final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight());
+ if (leftRCount == null || rightRCount == null) {
+ return null;
+ }
+ final double rCount = leftRCount + rightRCount;
+ // 2. CPU cost = HashTable construction cost +
+ // join cost
+ ImmutableList cardinalities = new ImmutableList.Builder().
+ add(leftRCount).
+ add(rightRCount).
+ build();
+ ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder();
+ switch (join.getMapJoinStreamingSide()) {
+ case LEFT_RELATION:
+ streamingBuilder.set(0);
+ break;
+ case RIGHT_RELATION:
+ streamingBuilder.set(1);
+ break;
+ default:
+ return null;
+ }
+ ImmutableBitSet streaming = streamingBuilder.build();
+ final double cpuCost = HiveCostUtil.computeSMBMapJoinCPUCost(cardinalities);
+ // 3. IO cost = cost of transferring small tables to join node *
+ // degree of parallelism
+ final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft());
+ final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight());
+ if (leftRAverageSize == null || rightRAverageSize == null) {
+ return null;
+ }
+ ImmutableList> relationInfos = new ImmutableList.Builder>().
+ add(new Pair(leftRCount,leftRAverageSize)).
+ add(new Pair(rightRCount,rightRAverageSize)).
+ build();
+ final int parallelism = RelMetadataQuery.splitCount(join) == null
+ ? 1 : RelMetadataQuery.splitCount(join);
+ final double ioCost = HiveCostUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism);
+ // 4. Result
+ return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java (working copy)
@@ -90,22 +90,17 @@
return io;
}
- // TODO: If two cost is equal, could we do any better than comparing
- // cardinality (may be some other heuristics to break the tie)
public boolean isLe(RelOptCost other) {
- return this == other || this.rowCount <= other.getRows();
- /*
- * if (((this.dCpu + this.dIo) < (other.getCpu() + other.getIo())) ||
- * ((this.dCpu + this.dIo) == (other.getCpu() + other.getIo()) && this.dRows
- * <= other.getRows())) { return true; } else { return false; }
- */
+ if ( (this.cpu + this.io < other.getCpu() + other.getIo()) ||
+ ((this.cpu + this.io == other.getCpu() + other.getIo()) &&
+ (this.rowCount <= other.getRows()))) {
+ return true;
}
+ return false;
+ }
public boolean isLt(RelOptCost other) {
- return this.rowCount < other.getRows();
- /*
- * return isLe(other) && !equals(other);
- */
+ return isLe(other) && !equals(other);
}
public double getRows() {
@@ -113,21 +108,14 @@
}
public boolean equals(RelOptCost other) {
- return (this == other) || ((this.rowCount) == (other.getRows()));
-
- /*
- * //TODO: should we consider cardinality as well? return (this == other) ||
- * ((this.dCpu + this.dIo) == (other.getCpu() + other.getIo()));
- */
+ return (this == other) ||
+ ((this.cpu + this.io == other.getCpu() + other.getIo()) &&
+ (this.rowCount == other.getRows()));
}
public boolean isEqWithEpsilon(RelOptCost other) {
- return (this == other) || (Math.abs((this.rowCount) - (other.getRows())) < RelOptUtil.EPSILON);
- // Turn this one once we do the Algorithm selection in CBO
- /*
- * return (this == other) || (Math.abs((this.dCpu + this.dIo) -
- * (other.getCpu() + other.getIo())) < RelOptUtil.EPSILON);
- */
+ return (this == other) || (Math.abs((this.cpu + this.io) -
+ (other.getCpu() + other.getIo())) < RelOptUtil.EPSILON);
}
public RelOptCost minus(RelOptCost other) {
@@ -135,8 +123,8 @@
return this;
}
- return new HiveCost(this.rowCount - other.getRows(), this.cpu - other.getCpu(), this.io
- - other.getIo());
+ return new HiveCost(this.rowCount - other.getRows(), this.cpu - other.getCpu(),
+ this.io - other.getIo());
}
public RelOptCost multiplyBy(double factor) {
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveVolcanoPlanner.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveVolcanoPlanner.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveVolcanoPlanner.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfigContext;
/**
* Refinement of {@link org.apache.calcite.plan.volcano.VolcanoPlanner} for Hive.
@@ -34,12 +35,12 @@
private static final boolean ENABLE_COLLATION_TRAIT = true;
/** Creates a HiveVolcanoPlanner. */
- public HiveVolcanoPlanner() {
- super(HiveCost.FACTORY, null);
+ public HiveVolcanoPlanner(HiveConfigContext conf) {
+ super(HiveCost.FACTORY, conf);
}
- public static RelOptPlanner createPlanner() {
- final VolcanoPlanner planner = new HiveVolcanoPlanner();
+ public static RelOptPlanner createPlanner(HiveConfigContext conf) {
+ final VolcanoPlanner planner = new HiveVolcanoPlanner(conf);
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
if (ENABLE_COLLATION_TRAIT) {
planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 1672206)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java (working copy)
@@ -18,26 +18,160 @@
package org.apache.hadoop.hive.ql.optimizer.calcite.cost;
import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import com.google.common.collect.ImmutableList;
+
// Use this once we have Join Algorithm selection
public class HiveCostUtil {
- private static final double cpuCostInNanoSec = 1.0;
- private static final double netCostInNanoSec = 150 * cpuCostInNanoSec;
- private static final double localFSWriteCostInNanoSec = 4 * netCostInNanoSec;
- private static final double localFSReadCostInNanoSec = 4 * netCostInNanoSec;
- private static final double hDFSWriteCostInNanoSec = 10 * localFSWriteCostInNanoSec;
- @SuppressWarnings("unused")
-//Use this once we have Join Algorithm selection
- private static final double hDFSReadCostInNanoSec = 1.5 * localFSReadCostInNanoSec;
+ private static final double CPU_COST = 1.0;
+ private static final double NET_COST = 150.0 * CPU_COST;
+ private static final double LOCAL_WRITE_COST = 4.0 * NET_COST;
+ private static final double LOCAL_READ_COST = 4.0 * NET_COST;
+ private static final double HDFS_WRITE_COST = 10.0 * LOCAL_WRITE_COST;
+ private static final double HDFS_READ_COST = 1.5 * LOCAL_READ_COST;
+
public static RelOptCost computCardinalityBasedCost(HiveRelNode hr) {
return new HiveCost(hr.getRows(), 0, 0);
}
public static HiveCost computeCost(HiveTableScan t) {
double cardinality = t.getRows();
- return new HiveCost(cardinality, 0, hDFSWriteCostInNanoSec * cardinality * 0);
+ return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0);
}
+
+ public static double computeSortMergeCPUCost(
+ ImmutableList cardinalities,
+ ImmutableBitSet sorted) {
+ // Sort-merge join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos) {
+ // Sort-merge join
+ double ioCost = 0.0;
+ for (Pair relationInfo : relationInfos) {
+ ioCost += computeSortIOCost(relationInfo);
+ }
+ return ioCost;
+ }
+
+ public static double computeSortIOCost(Pair relationInfo) {
+ // Sort-merge join
+ double ioCost = 0.0;
+ double cardinality = relationInfo.left;
+ double averageTupleSize = relationInfo.right;
+ // Write cost
+ ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST;
+ // Read cost
+ ioCost += cardinality * averageTupleSize * LOCAL_READ_COST;
+ // Net transfer cost
+ ioCost += cardinality * averageTupleSize * NET_COST;
+ return ioCost;
+ }
+
+ public static double computeMapJoinCPUCost(
+ ImmutableList cardinalities,
+ ImmutableBitSet streaming) {
+ // Hash-join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos,
+ ImmutableBitSet streaming, int parallelism) {
+ // Hash-join
+ double ioCost = 0.0;
+ for (int i=0; i cardinalities,
+ ImmutableBitSet streaming) {
+ // Hash-join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos,
+ ImmutableBitSet streaming, int parallelism) {
+ // Hash-join
+ double ioCost = 0.0;
+ for (int i=0; i cardinalities) {
+ // Hash-join
+ double cpuCost = 0.0;
+ for (int i=0; i> relationInfos,
+ ImmutableBitSet streaming, int parallelism) {
+ // Hash-join
+ double ioCost = 0.0;
+ for (int i=0; i hiveNonPartitionCols;
+ private final ImmutableList hivePartitionCols;
private final ImmutableMap hiveNonPartitionColsMap;
private final ImmutableMap hivePartitionColsMap;
- private final int noOfProjs;
+ private final ImmutableList hiveVirtualCols;
+ private final int noOfNonVirtualCols;
final HiveConf hiveConf;
private double rowCount = -1;
@@ -67,37 +78,66 @@
PrunedPartitionList partitionList;
Map partitionCache;
AtomicInteger noColsMissingStats;
+ private final String qbID;
protected static final Log LOG = LogFactory
.getLog(RelOptHiveTable.class
.getName());
- public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, String tblAlias, RelDataType rowType,
- Table hiveTblMetadata, List hiveNonPartitionCols,
- List hivePartitionCols, HiveConf hconf, Map partitionCache, AtomicInteger noColsMissingStats) {
+ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, String tblAlias,
+ RelDataType rowType, Table hiveTblMetadata, List hiveNonPartitionCols,
+ List hivePartitionCols, List hiveVirtualCols, HiveConf hconf,
+ Map partitionCache, AtomicInteger noColsMissingStats,
+ String qbID) {
super(calciteSchema, qualifiedTblName, rowType);
this.hiveTblMetadata = hiveTblMetadata;
this.tblAlias = tblAlias;
this.hiveNonPartitionCols = ImmutableList.copyOf(hiveNonPartitionCols);
- this.hiveNonPartitionColsMap = getColInfoMap(hiveNonPartitionCols, 0);
- this.hivePartitionColsMap = getColInfoMap(hivePartitionCols, hiveNonPartitionColsMap.size());
- this.noOfProjs = hiveNonPartitionCols.size() + hivePartitionCols.size();
+ this.hiveNonPartitionColsMap = HiveCalciteUtil.getColInfoMap(hiveNonPartitionCols, 0);
+ this.hivePartitionCols = ImmutableList.copyOf(hivePartitionCols);
+ this.hivePartitionColsMap = HiveCalciteUtil.getColInfoMap(hivePartitionCols, hiveNonPartitionColsMap.size());
+ this.noOfNonVirtualCols = hiveNonPartitionCols.size() + hivePartitionCols.size();
+ this.hiveVirtualCols = ImmutableList.copyOf(hiveVirtualCols);
this.hiveConf = hconf;
this.partitionCache = partitionCache;
this.noColsMissingStats = noColsMissingStats;
+ this.qbID = qbID;
}
- private static ImmutableMap getColInfoMap(List hiveCols,
- int startIndx) {
- Builder bldr = ImmutableMap. builder();
+ public RelOptHiveTable copy(RelDataType newRowType) {
+ // 1. Build map of column name to col index of original schema
+ // Assumption: Hive Table can not contain duplicate column names
+ Map nameToColIndxMap = new HashMap();
+ for (RelDataTypeField f : this.rowType.getFieldList()) {
+ nameToColIndxMap.put(f.getName(), f.getIndex());
+ }
- int indx = startIndx;
- for (ColumnInfo ci : hiveCols) {
- bldr.put(indx, ci);
- indx++;
+ // 2. Build nonPart/Part/Virtual column info for new RowSchema
+ List newHiveNonPartitionCols = new ArrayList();
+ List newHivePartitionCols = new ArrayList();
+ List newHiveVirtualCols = new ArrayList();
+ Map virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols,
+ this.noOfNonVirtualCols);
+ Integer originalColIndx;
+ ColumnInfo cInfo;
+ VirtualColumn vc;
+ for (RelDataTypeField f : newRowType.getFieldList()) {
+ originalColIndx = nameToColIndxMap.get(f.getName());
+ if ((cInfo = hiveNonPartitionColsMap.get(originalColIndx)) != null) {
+ newHiveNonPartitionCols.add(new ColumnInfo(cInfo));
+ } else if ((cInfo = hivePartitionColsMap.get(originalColIndx)) != null) {
+ newHivePartitionCols.add(new ColumnInfo(cInfo));
+ } else if ((vc = virtualColInfoMap.get(originalColIndx)) != null) {
+ newHiveVirtualCols.add(vc);
+ } else {
+ throw new RuntimeException("Copy encountered a column not seen in original TS");
}
+ }
- return bldr.build();
+ // 3. Build new Table
+ return new RelOptHiveTable(this.schema, this.name, this.tblAlias, newRowType,
+ this.hiveTblMetadata, newHiveNonPartitionCols, newHivePartitionCols, newHiveVirtualCols,
+ this.hiveConf, this.partitionCache, this.noColsMissingStats, qbID);
}
@Override
@@ -116,16 +156,57 @@
}
@Override
+ public List getCollationList() {
+ ImmutableList.Builder collationList = new ImmutableList.Builder();
+ for (Order sortColumn : this.hiveTblMetadata.getSortCols()) {
+ for (int i=0; i()
+ .add(RelCollationTraitDef.INSTANCE.canonize(
+ new HiveRelCollation(collationList.build())))
+ .build();
+ }
+
+ @Override
+ public RelDistribution getDistribution() {
+ ImmutableList.Builder columnPositions = new ImmutableList.Builder();
+ for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) {
+ for (int i=0; i rowCounts = StatsUtils.getBasicStatForPartitions(
- hiveTblMetadata, partitionList.getNotDeniedPartns(),
- StatsSetupConst.ROW_COUNT);
+ List rowCounts = StatsUtils.getBasicStatForPartitions(hiveTblMetadata,
+ partitionList.getNotDeniedPartns(), StatsSetupConst.ROW_COUNT);
rowCount = StatsUtils.getSumIgnoreNegatives(rowCounts);
} else {
@@ -144,8 +225,10 @@
}
public String getTableAlias() {
- // NOTE: Calcite considers tbls to be equal if their names are the same. Hence
- // we need to provide Calcite the fully qualified table name (dbname.tblname)
+ // NOTE: Calcite considers tbls to be equal if their names are the same.
+ // Hence
+ // we need to provide Calcite the fully qualified table name
+ // (dbname.tblname)
// and not the user provided aliases.
// However in HIVE DB name can not appear in select list; in case of join
// where table names differ only in DB name, Hive would require user
@@ -173,16 +256,21 @@
public void computePartitionList(HiveConf conf, RexNode pruneNode) {
try {
- if (!hiveTblMetadata.isPartitioned() || pruneNode == null || InputFinder.bits(pruneNode).length() == 0 ) {
- // there is no predicate on partitioning column, we need all partitions in this case.
- partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), partitionCache);
+ if (!hiveTblMetadata.isPartitioned() || pruneNode == null
+ || InputFinder.bits(pruneNode).length() == 0) {
+ // there is no predicate on partitioning column, we need all partitions
+ // in this case.
+ partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(),
+ partitionCache);
return;
}
// We have valid pruning expressions, only retrieve qualifying partitions
- ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(), true, getRelOptSchema().getTypeFactory()));
+ ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(),
+ true, this.getRelOptSchema().getTypeFactory()));
- partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(), partitionCache);
+ partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(),
+ partitionCache);
} catch (HiveException he) {
throw new RuntimeException(he);
}
@@ -289,10 +377,10 @@
if (colNamesFailedStats.isEmpty() && !partColNamesThatRqrStats.isEmpty()) {
ColStatistics cStats = null;
for (int i = 0; i < partColNamesThatRqrStats.size(); i++) {
- cStats = new ColStatistics(hiveTblMetadata.getTableName(),
- partColNamesThatRqrStats.get(i), hivePartitionColsMap.get(
- partColIndxsThatRqrStats.get(i)).getTypeName());
- cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(),partColNamesThatRqrStats.get(i)));
+ cStats = new ColStatistics(hiveTblMetadata.getTableName(), partColNamesThatRqrStats.get(i),
+ hivePartitionColsMap.get(partColIndxsThatRqrStats.get(i)).getTypeName());
+ cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(),
+ partColNamesThatRqrStats.get(i)));
hiveColStatsMap.put(partColIndxsThatRqrStats.get(i), cStats);
}
}
@@ -325,7 +413,7 @@
}
} else {
List pILst = new ArrayList();
- for (Integer i = 0; i < noOfProjs; i++) {
+ for (Integer i = 0; i < noOfNonVirtualCols; i++) {
pILst.add(i);
}
updateColStats(new HashSet(pILst));
@@ -338,10 +426,8 @@
}
/*
- * use to check if a set of columns are all partition columns.
- * true only if:
- * - all columns in BitSet are partition
- * columns.
+ * use to check if a set of columns are all partition columns. true only if: -
+ * all columns in BitSet are partition columns.
*/
public boolean containsPartitionColumnsOnly(ImmutableBitSet cols) {
@@ -352,4 +438,32 @@
}
return true;
}
+
+ public List getVirtualCols() {
+ return this.hiveVirtualCols;
}
+
+ public List getPartColumns() {
+ return this.hivePartitionCols;
+ }
+
+ public List getNonPartColumns() {
+ return this.hiveNonPartitionCols;
+ }
+
+ public String getQBID() {
+ return qbID;
+ }
+
+ public int getNoOfNonVirtualCols() {
+ return noOfNonVirtualCols;
+ }
+
+ public Map getPartColInfoMap() {
+ return hivePartitionColsMap;
+ }
+
+ public Map getNonPartColInfoMap() {
+ return hiveNonPartitionColsMap;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java (.../https://svn.apache.org/repos/asf/hive/trunk) (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java (revision 1672174)
@@ -0,0 +1,891 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.translator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistribution.Type;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+import org.apache.hadoop.hive.ql.parse.JoinCond;
+import org.apache.hadoop.hive.ql.parse.JoinType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.UnparseTranslator;
+import org.apache.hadoop.hive.ql.parse.WindowingComponentizer;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.UnionDesc;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class HiveOpConverter {
+
+ private static final Log LOG = LogFactory.getLog(HiveOpConverter.class);
+
+ public static enum HIVEAGGOPMODE {
+ NO_SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan1MR
+ SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan2MR
+ NO_SKEW_MAP_SIDE_AGG, // Corresponds to SemAnalyzer
+ // genGroupByPlanMapAggrNoSkew
+ SKEW_MAP_SIDE_AGG // Corresponds to SemAnalyzer genGroupByPlanMapAggr2MR
+ };
+
+ // TODO: remove this after stashing only rqd pieces from opconverter
+ private final SemanticAnalyzer semanticAnalyzer;
+ private final HiveConf hiveConf;
+ private final UnparseTranslator unparseTranslator;
+ private final Map> topOps;
+ private final boolean strictMode;
+ private int reduceSinkTagGenerator;
+
+ public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf,
+ UnparseTranslator unparseTranslator, Map> topOps,
+ boolean strictMode) {
+ this.semanticAnalyzer = semanticAnalyzer;
+ this.hiveConf = hiveConf;
+ this.unparseTranslator = unparseTranslator;
+ this.topOps = topOps;
+ this.strictMode = strictMode;
+ this.reduceSinkTagGenerator = 0;
+ }
+
+ static class OpAttr {
+ final String tabAlias;
+ ImmutableList inputs;
+ ImmutableMap vcolMap;
+
+ OpAttr(String tabAlias, Map vcolMap, Operator... inputs) {
+ this.tabAlias = tabAlias;
+ this.vcolMap = ImmutableMap.copyOf(vcolMap);
+ this.inputs = ImmutableList.copyOf(inputs);
+ }
+
+ private OpAttr clone(Operator... inputs) {
+ return new OpAttr(tabAlias, this.vcolMap, inputs);
+ }
+ }
+
+ public Operator convert(RelNode root) throws SemanticException {
+ OpAttr opAf = dispatch(root);
+ return opAf.inputs.get(0);
+ }
+
+ OpAttr dispatch(RelNode rn) throws SemanticException {
+ if (rn instanceof HiveTableScan) {
+ return visit((HiveTableScan) rn);
+ } else if (rn instanceof HiveProject) {
+ return visit((HiveProject) rn);
+ } else if (rn instanceof HiveJoin) {
+ return visit((HiveJoin) rn);
+ } else if (rn instanceof SemiJoin) {
+ SemiJoin sj = (SemiJoin) rn;
+ HiveJoin hj = HiveJoin.getJoin(sj.getCluster(), sj.getLeft(), sj.getRight(),
+ sj.getCondition(), sj.getJoinType(), true);
+ return visit(hj);
+ } else if (rn instanceof HiveFilter) {
+ return visit((HiveFilter) rn);
+ } else if (rn instanceof HiveSort) {
+ return visit((HiveSort) rn);
+ } else if (rn instanceof HiveUnion) {
+ return visit((HiveUnion) rn);
+ } else if (rn instanceof LogicalExchange) {
+ return visit((LogicalExchange) rn);
+ } else if (rn instanceof HiveAggregate) {
+ return visit((HiveAggregate) rn);
+ }
+ LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported"
+ + " yet in return path.");
+ return null;
+ }
+
+ /**
+ * TODO: 1. PPD needs to get pushed in to TS
+ *
+ * @param scanRel
+ * @return
+ */
+ OpAttr visit(HiveTableScan scanRel) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName()
+ + " with row type: [" + scanRel.getRowType() + "]");
+ }
+
+ RelOptHiveTable ht = (RelOptHiveTable) scanRel.getTable();
+
+ // 1. Setup TableScan Desc
+ // 1.1 Build col details used by scan
+ ArrayList colInfos = new ArrayList();
+ List virtualCols = new ArrayList(ht.getVirtualCols());
+ Map hiveScanVColMap = new HashMap();
+ List partColNames = new ArrayList();
+ List neededColumnIDs = new ArrayList();
+ List neededColumns = new ArrayList();
+
+ Map posToVColMap = HiveCalciteUtil.getVColsMap(virtualCols,
+ ht.getNoOfNonVirtualCols());
+ Map posToPartColInfo = ht.getPartColInfoMap();
+ Map posToNonPartColInfo = ht.getNonPartColInfoMap();
+ List neededColIndxsFrmReloptHT = scanRel.getNeededColIndxsFrmReloptHT();
+ List scanColNames = scanRel.getRowType().getFieldNames();
+ String tableAlias = ht.getTableAlias();
+
+ String colName;
+ ColumnInfo colInfo;
+ VirtualColumn vc;
+ Integer posInRHT;
+
+ for (int i = 0; i < neededColIndxsFrmReloptHT.size(); i++) {
+ colName = scanColNames.get(i);
+ posInRHT = neededColIndxsFrmReloptHT.get(i);
+ if (posToVColMap.containsKey(posInRHT)) {
+ vc = posToVColMap.get(posInRHT);
+ virtualCols.add(vc);
+ colInfo = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true, vc.getIsHidden());
+ hiveScanVColMap.put(i, vc);
+ } else if (posToPartColInfo.containsKey(posInRHT)) {
+ partColNames.add(colName);
+ colInfo = posToPartColInfo.get(posInRHT);
+ } else {
+ colInfo = posToNonPartColInfo.get(posInRHT);
+ }
+ neededColumnIDs.add(posInRHT);
+ neededColumns.add(colName);
+ colInfos.add(colInfo);
+ }
+
+ // 1.2 Create TableScanDesc
+ TableScanDesc tsd = new TableScanDesc(tableAlias, virtualCols, ht.getHiveTableMD());
+
+ // 1.3. Set Partition cols in TSDesc
+ tsd.setPartColumns(partColNames);
+
+ // 1.4. Set needed cols in TSDesc
+ tsd.setNeededColumnIDs(neededColumnIDs);
+ tsd.setNeededColumns(neededColumns);
+
+ // 2. Setup TableScan
+ TableScanOperator ts = (TableScanOperator) OperatorFactory.get(tsd, new RowSchema(colInfos));
+
+ topOps.put(ht.getQBID(), ts);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + ts + " with row schema: [" + ts.getSchema() + "]");
+ }
+
+ return new OpAttr(tableAlias, hiveScanVColMap, ts);
+ }
+
+ OpAttr visit(HiveProject projectRel) throws SemanticException {
+ OpAttr inputOpAf = dispatch(projectRel.getInput());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + projectRel.getId() + ":"
+ + projectRel.getRelTypeName() + " with row type: [" + projectRel.getRowType() + "]");
+ }
+
+ WindowingSpec windowingSpec = new WindowingSpec();
+ List exprCols = new ArrayList();
+ for (int pos = 0; pos < projectRel.getChildExps().size(); pos++) {
+ ExprNodeConverter converter = new ExprNodeConverter(inputOpAf.tabAlias, projectRel
+ .getRowType().getFieldNames().get(pos), projectRel.getInput().getRowType(),
+ projectRel.getRowType(), false, projectRel.getCluster().getTypeFactory());
+ exprCols.add((ExprNodeDesc) projectRel.getChildExps().get(pos).accept(converter));
+ if (converter.getWindowFunctionSpec() != null) {
+ windowingSpec.addWindowFunction(converter.getWindowFunctionSpec());
+ }
+ }
+ if (windowingSpec.getWindowExpressions() != null
+ && !windowingSpec.getWindowExpressions().isEmpty()) {
+ inputOpAf = genPTF(inputOpAf, windowingSpec);
+ }
+ // TODO: is this a safe assumption (name collision, external names...)
+ List exprNames = new ArrayList(projectRel.getRowType().getFieldNames());
+ SelectDesc sd = new SelectDesc(exprCols, exprNames);
+ Pair, Map> colInfoVColPair = createColInfos(
+ projectRel.getChildExps(), exprCols, exprNames, inputOpAf);
+ SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(sd, new RowSchema(
+ colInfoVColPair.getKey()), inputOpAf.inputs.get(0));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + selOp + " with row schema: [" + selOp.getSchema() + "]");
+ }
+
+ return new OpAttr(inputOpAf.tabAlias, colInfoVColPair.getValue(), selOp);
+ }
+
+ OpAttr visit(HiveJoin joinRel) throws SemanticException {
+ // 1. Convert inputs
+ OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()];
+ List> children = new ArrayList>(joinRel.getInputs().size());
+ for (int i = 0; i < inputs.length; i++) {
+ inputs[i] = dispatch(joinRel.getInput(i));
+ children.add(inputs[i].inputs.get(0));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + joinRel.getId() + ":" + joinRel.getRelTypeName()
+ + " with row type: [" + joinRel.getRowType() + "]");
+ }
+
+ // 2. Convert join condition
+ JoinPredicateInfo joinPredInfo = JoinPredicateInfo.constructJoinPredicateInfo(joinRel);
+
+ // 3. Extract join keys from condition
+ ExprNodeDesc[][] joinKeys = extractJoinKeys(joinPredInfo, joinRel.getInputs());
+
+ // 4. Generate Join operator
+ JoinOperator joinOp = genJoin(joinRel, joinPredInfo, children, joinKeys);
+
+ // 5. TODO: Extract condition for non-equi join elements (if any) and
+ // add it
+
+ // 6. Virtual columns
+ Map vcolMap = new HashMap();
+ vcolMap.putAll(inputs[0].vcolMap);
+ if (extractJoinType(joinRel) != JoinType.LEFTSEMI) {
+ int shift = inputs[0].inputs.get(0).getSchema().getSignature().size();
+ for (int i = 1; i < inputs.length; i++) {
+ vcolMap.putAll(HiveCalciteUtil.shiftVColsMap(inputs[i].vcolMap, shift));
+ shift += inputs[i].inputs.get(0).getSchema().getSignature().size();
+ }
+ }
+
+ // 8. Return result
+ return new OpAttr(null, vcolMap, joinOp);
+ }
+
+ OpAttr visit(HiveAggregate aggRel) throws SemanticException {
+ OpAttr inputOpAf = dispatch(aggRel.getInput());
+ return HiveGBOpConvUtil.translateGB(inputOpAf, aggRel, hiveConf);
+ }
+
+ OpAttr visit(HiveSort sortRel) throws SemanticException {
+ OpAttr inputOpAf = dispatch(sortRel.getInput());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+ + " with row type: [" + sortRel.getRowType() + "]");
+ if (sortRel.getCollation() == RelCollations.EMPTY) {
+ LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+ + " consists of limit");
+ } else if (sortRel.fetch == null) {
+ LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+ + " consists of sort");
+ } else {
+ LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName()
+ + " consists of sort+limit");
+ }
+ }
+
+ Operator> inputOp = inputOpAf.inputs.get(0);
+ Operator> resultOp = inputOpAf.inputs.get(0);
+ // 1. If we need to sort tuples based on the value of some
+ // of their columns
+ if (sortRel.getCollation() != RelCollations.EMPTY) {
+
+ // In strict mode, in the presence of order by, limit must be
+ // specified
+ if (strictMode && sortRel.fetch == null) {
+ throw new SemanticException(ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg());
+ }
+
+ // 1.a. Extract order for each column from collation
+ // Generate sortCols and order
+ List sortCols = new ArrayList();
+ StringBuilder order = new StringBuilder();
+ for (RelCollation collation : sortRel.getCollationList()) {
+ for (RelFieldCollation sortInfo : collation.getFieldCollations()) {
+ int sortColumnPos = sortInfo.getFieldIndex();
+ ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature()
+ .get(sortColumnPos));
+ ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(),
+ columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol());
+ sortCols.add(sortColumn);
+ if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) {
+ order.append("-");
+ } else {
+ order.append("+");
+ }
+ }
+ }
+ // Use only 1 reducer for order by
+ int numReducers = 1;
+
+ // 1.b. Generate reduce sink and project operator
+ resultOp = genReduceSinkAndBacktrackSelect(resultOp,
+ sortCols.toArray(new ExprNodeDesc[sortCols.size()]), -1, new ArrayList(),
+ order.toString(), numReducers, Operation.NOT_ACID, strictMode);
+ }
+
+ // 2. If we need to generate limit
+ if (sortRel.fetch != null) {
+ int limit = RexLiteral.intValue(sortRel.fetch);
+ LimitDesc limitDesc = new LimitDesc(limit);
+ // TODO: Set 'last limit' global property
+ ArrayList cinfoLst = createColInfos(inputOp);
+ resultOp = (LimitOperator) OperatorFactory.getAndMakeChild(limitDesc,
+ new RowSchema(cinfoLst), resultOp);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]");
+ }
+ }
+
+ // 3. Return result
+ return inputOpAf.clone(resultOp);
+ }
+
+ /**
+ * TODO: 1) isSamplingPred 2) sampleDesc 3) isSortedFilter
+ */
+ OpAttr visit(HiveFilter filterRel) throws SemanticException {
+ OpAttr inputOpAf = dispatch(filterRel.getInput());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + filterRel.getId() + ":" + filterRel.getRelTypeName()
+ + " with row type: [" + filterRel.getRowType() + "]");
+ }
+
+ ExprNodeDesc filCondExpr = filterRel.getCondition().accept(
+ new ExprNodeConverter(inputOpAf.tabAlias, filterRel.getInput().getRowType(), false,
+ filterRel.getCluster().getTypeFactory()));
+ FilterDesc filDesc = new FilterDesc(filCondExpr, false);
+ ArrayList cinfoLst = createColInfos(inputOpAf.inputs.get(0));
+ FilterOperator filOp = (FilterOperator) OperatorFactory.getAndMakeChild(filDesc, new RowSchema(
+ cinfoLst), inputOpAf.inputs.get(0));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + filOp + " with row schema: [" + filOp.getSchema() + "]");
+ }
+
+ return inputOpAf.clone(filOp);
+ }
+
+ OpAttr visit(HiveUnion unionRel) throws SemanticException {
+ // 1. Convert inputs
+ OpAttr[] inputs = new OpAttr[unionRel.getInputs().size()];
+ for (int i = 0; i < inputs.length; i++) {
+ inputs[i] = dispatch(unionRel.getInput(i));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + unionRel.getId() + ":" + unionRel.getRelTypeName()
+ + " with row type: [" + unionRel.getRowType() + "]");
+ }
+
+ // 2. Create a new union operator
+ UnionDesc unionDesc = new UnionDesc();
+ unionDesc.setNumInputs(inputs.length);
+ ArrayList cinfoLst = createColInfos(inputs[0].inputs.get(0));
+ Operator>[] children = new Operator>[inputs.length];
+ for (int i = 0; i < children.length; i++) {
+ children[i] = inputs[i].inputs.get(0);
+ }
+ Operator extends OperatorDesc> unionOp = OperatorFactory.getAndMakeChild(unionDesc,
+ new RowSchema(cinfoLst), children);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + unionOp + " with row schema: [" + unionOp.getSchema() + "]");
+ }
+
+ // 3. Return result
+ return inputs[0].clone(unionOp);
+ }
+
+ OpAttr visit(LogicalExchange exchangeRel) throws SemanticException {
+ OpAttr inputOpAf = dispatch(exchangeRel.getInput());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translating operator rel#" + exchangeRel.getId() + ":"
+ + exchangeRel.getRelTypeName() + " with row type: [" + exchangeRel.getRowType() + "]");
+ }
+
+ RelDistribution distribution = exchangeRel.getDistribution();
+ if (distribution.getType() != Type.HASH_DISTRIBUTED) {
+ throw new SemanticException("Only hash distribution supported for LogicalExchange");
+ }
+ ExprNodeDesc[] expressions = new ExprNodeDesc[distribution.getKeys().size()];
+ for (int i = 0; i < distribution.getKeys().size(); i++) {
+ int key = distribution.getKeys().get(i);
+ ColumnInfo colInfo = inputOpAf.inputs.get(0).getSchema().getSignature().get(key);
+ ExprNodeDesc column = new ExprNodeColumnDesc(colInfo);
+ expressions[i] = column;
+ }
+
+ ReduceSinkOperator rsOp = genReduceSink(inputOpAf.inputs.get(0), expressions,
+ reduceSinkTagGenerator++, -1, Operation.NOT_ACID, strictMode);
+
+ return inputOpAf.clone(rsOp);
+ }
+
+ private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticException {
+ Operator> input = inputOpAf.inputs.get(0);
+
+ wSpec.validateAndMakeEffective();
+ WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+ RowResolver rr = new RowResolver();
+ for (ColumnInfo ci : input.getSchema().getSignature()) {
+ rr.put(ci.getTabAlias(), ci.getInternalName(), ci);
+ }
+
+ while (groups.hasNext()) {
+ wSpec = groups.next(hiveConf, semanticAnalyzer, unparseTranslator, rr);
+
+ // 1. Create RS and backtrack Select operator on top
+ ArrayList keyCols = new ArrayList();
+ ArrayList partCols = new ArrayList();
+ StringBuilder order = new StringBuilder();
+
+ for (PartitionExpression partCol : wSpec.getQueryPartitionSpec().getExpressions()) {
+ ExprNodeDesc partExpr = semanticAnalyzer.genExprNodeDesc(partCol.getExpression(), rr);
+ if (ExprNodeDescUtils.indexOf(partExpr, partCols) < 0) {
+ keyCols.add(partExpr);
+ partCols.add(partExpr);
+ order.append('+');
+ }
+ }
+
+ if (wSpec.getQueryOrderSpec() != null) {
+ for (OrderExpression orderCol : wSpec.getQueryOrderSpec().getExpressions()) {
+ ExprNodeDesc orderExpr = semanticAnalyzer.genExprNodeDesc(orderCol.getExpression(), rr);
+ char orderChar = orderCol.getOrder() == PTFInvocationSpec.Order.ASC ? '+' : '-';
+ int index = ExprNodeDescUtils.indexOf(orderExpr, keyCols);
+ if (index >= 0) {
+ order.setCharAt(index, orderChar);
+ continue;
+ }
+ keyCols.add(orderExpr);
+ order.append(orderChar);
+ }
+ }
+
+ SelectOperator selectOp = genReduceSinkAndBacktrackSelect(input,
+ keyCols.toArray(new ExprNodeDesc[keyCols.size()]), reduceSinkTagGenerator++, partCols,
+ order.toString(), -1, Operation.NOT_ACID, strictMode);
+
+ // 2. Finally create PTF
+ PTFTranslator translator = new PTFTranslator();
+ PTFDesc ptfDesc = translator.translate(wSpec, semanticAnalyzer, hiveConf, rr,
+ unparseTranslator);
+ RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+
+ Operator> ptfOp = OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfOpRR.getColumnInfos()), selectOp);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + ptfOp + " with row schema: [" + ptfOp.getSchema() + "]");
+ }
+
+ // 3. Prepare for next iteration (if any)
+ rr = ptfOpRR;
+ input = ptfOp;
+ }
+
+ return inputOpAf.clone(input);
+ }
+
+ private ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo, List inputs) {
+ ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.size()][];
+ for (int i = 0; i < inputs.size(); i++) {
+ joinKeys[i] = new ExprNodeDesc[joinPredInfo.getEquiJoinPredicateElements().size()];
+ for (int j = 0; j < joinPredInfo.getEquiJoinPredicateElements().size(); j++) {
+ JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo.getEquiJoinPredicateElements().get(j);
+ RexNode key = joinLeafPredInfo.getJoinKeyExprs(j).get(0);
+ joinKeys[i][j] = convertToExprNode(key, inputs.get(j), null);
+ }
+ }
+ return joinKeys;
+ }
+
+ private static SelectOperator genReduceSinkAndBacktrackSelect(Operator> input,
+ ExprNodeDesc[] keys, int tag, ArrayList partitionCols, String order,
+ int numReducers, Operation acidOperation, boolean strictMode) throws SemanticException {
+ // 1. Generate RS operator
+ ReduceSinkOperator rsOp = genReduceSink(input, keys, tag, partitionCols, order, numReducers,
+ acidOperation, strictMode);
+
+ // 2. Generate backtrack Select operator
+ Map descriptors = buildBacktrackFromReduceSink((ReduceSinkOperator) rsOp,
+ input);
+ SelectDesc selectDesc = new SelectDesc(new ArrayList(descriptors.values()),
+ new ArrayList(descriptors.keySet()));
+ ArrayList cinfoLst = createColInfos(input);
+ SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(selectDesc,
+ new RowSchema(cinfoLst), rsOp);
+ selectOp.setColumnExprMap(descriptors);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated " + selectOp + " with row schema: [" + selectOp.getSchema() + "]");
+ }
+
+ return selectOp;
+ }
+
+ private static ReduceSinkOperator genReduceSink(Operator> input, ExprNodeDesc[] keys, int tag,
+ int numReducers, Operation acidOperation, boolean strictMode) throws SemanticException {
+ return genReduceSink(input, keys, tag, new ArrayList(), "", numReducers,
+ acidOperation, strictMode);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static ReduceSinkOperator genReduceSink(Operator> input, ExprNodeDesc[] keys, int tag,
+ ArrayList partitionCols, String order, int numReducers,
+ Operation acidOperation, boolean strictMode) throws SemanticException {
+ Operator dummy = Operator.createDummy(); // dummy for backtracking
+ dummy.setParentOperators(Arrays.asList(input));
+
+ ArrayList reduceKeys = new ArrayList();
+ ArrayList reduceKeysBack = new ArrayList();
+
+ // Compute join keys and store in reduceKeys
+ for (ExprNodeDesc key : keys) {
+ reduceKeys.add(key);
+ reduceKeysBack.add(ExprNodeDescUtils.backtrack(key, dummy, input));
+ }
+
+ // Walk over the input schema and copy in the output
+ ArrayList reduceValues = new ArrayList();
+ ArrayList reduceValuesBack = new ArrayList();
+ Map colExprMap = new HashMap();
+
+ List