commit b3ac4b1436c99900804d2ba54140c705affddf4f Author: Ivan Suller Date: Mon Aug 5 12:04:10 2019 +0200 HIVE-22083 Change-Id: I9ace105f01302b7c829c7da3d7974c5ed39c1e7d diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 2d76848413..a0ad5fdc4b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -115,7 +115,7 @@ */ protected transient List[] rowContainerStandardObjectInspectors; - protected transient Byte[] order; // order in which the results should + protected transient byte[] order; // order in which the results should // be output protected transient JoinCondDesc[] condn; protected transient boolean[] nullsafes; @@ -205,11 +205,10 @@ public CommonJoinOperator(CommonJoinOperator clone) { this.needsPostEvaluation = clone.needsPostEvaluation; } - private ObjectInspector getJoinOutputObjectInspector( - Byte[] order, List[] aliasToObjectInspectors, T conf) { + private ObjectInspector getJoinOutputObjectInspector() { List structFieldObjectInspectors = new ArrayList(); - for (Byte alias : order) { - List oiList = getValueObjectInspectors(alias, aliasToObjectInspectors); + for (byte o : order) { + List oiList = getValueObjectInspectors(o, joinValuesStandardObjectInspectors); if (oiList != null && !oiList.isEmpty()) { structFieldObjectInspectors.addAll(oiList); } @@ -278,13 +277,13 @@ protected void initializeOp(Configuration hconf) throws HiveException { rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors; } else { List[] rowContainerObjectInspectors = new List[tagLen]; - for (Byte alias : order) { + for (byte o : order) { ArrayList rcOIs = new ArrayList(); - rcOIs.addAll(joinValuesObjectInspectors[alias]); + rcOIs.addAll(joinValuesObjectInspectors[o]); // for each alias, add object inspector for short as the last element rcOIs.add( PrimitiveObjectInspectorFactory.writableShortObjectInspector); - rowContainerObjectInspectors[alias] = rcOIs; + rowContainerObjectInspectors[o] = rcOIs; } rowContainerStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(rowContainerObjectInspectors, NOTSKIPBIGTABLE, tagLen); @@ -305,8 +304,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { // construct spill table serde which is used if input is too // large to fit into main memory. byte pos = 0; - for (Byte alias : order) { - int sz = conf.getExprs().get(alias).size(); + for (byte o : order) { + int sz = conf.getExprs().get(o).size(); ArrayList nr = new ArrayList(sz); for (int j = 0; j < sz; j++) { @@ -323,7 +322,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { // there should be only 1 dummy object in the RowContainer RowContainer> values = JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], - alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter); + o, 1, spillTableDesc, conf, !hasFilter(pos), reporter); values.addRow(dummyObj[pos]); dummyObjVectors[pos] = values; @@ -332,7 +331,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { // e.g., the output columns does not contains the input table RowContainer> rc = JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], - alias, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter); + o, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter); storage[pos] = rc; pos++; @@ -357,8 +356,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } offsets[numAliases] = sum; - outputObjInspector = getJoinOutputObjectInspector(order, - joinValuesStandardObjectInspectors, conf); + outputObjInspector = getJoinOutputObjectInspector(); for( int i = 0; i < condn.length; i++ ) { if(condn[i].getType() == JoinDesc.LEFT_SEMI_JOIN) { @@ -891,8 +889,8 @@ protected void checkAndGenObject() throws HiveException { boolean hasNulls = false; // Will be true if there are null entries boolean allOne = true; for (int i = 0; i < numAliases; i++) { - Byte alias = order[i]; - AbstractRowContainer> alw = storage[alias]; + byte o = order[i]; + AbstractRowContainer> alw = storage[o]; if (!alw.isSingleRow()) { allOne = false; @@ -920,8 +918,8 @@ protected void checkAndGenObject() throws HiveException { boolean mayHasMoreThanOne = false; boolean hasEmpty = false; for (int i = 0; i < numAliases; i++) { - Byte alias = order[i]; - AbstractRowContainer> alw = storage[alias]; + byte o = order[i]; + AbstractRowContainer> alw = storage[o]; if (noOuterJoin) { if (!alw.hasRows()) { @@ -934,7 +932,7 @@ protected void checkAndGenObject() throws HiveException { hasEmpty = true; alw.addRow(dummyObj[i]); } else if (!hasEmpty && alw.isSingleRow()) { - if (hasAnyFiltered(alias, alw.rowIter().first())) { + if (hasAnyFiltered(o, alw.rowIter().first())) { hasEmpty = true; } } else { @@ -943,7 +941,7 @@ protected void checkAndGenObject() throws HiveException { AbstractRowContainer.RowIterator> iter = alw.rowIter(); for (List row = iter.first(); row != null; row = iter.next()) { reportProgress(); - if (hasAnyFiltered(alias, row)) { + if (hasAnyFiltered(o, row)) { hasEmpty = true; break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 36c93350d5..0e8ea33931 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -49,8 +49,6 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.util.ReflectionUtils; @@ -90,7 +88,6 @@ */ private transient List[] joinFilterObjectInspectors; - private transient Byte[] order; // order in which the results should protected Configuration hconf; protected transient MapJoinPersistableTableContainer[] mapJoinTables; @@ -131,7 +128,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { // for small tables only; so get the big table position first posBigTableAlias = conf.getPosBigTable(); - order = conf.getTagOrder(); + byte[] order = conf.getTagOrder(); // initialize some variables, which used to be initialized in CommonJoinOperator this.hconf = hconf; @@ -159,7 +156,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { inputObjInspectors, posBigTableAlias, tagLen); if (!conf.isNoOuterJoin()) { - for (Byte alias : order) { + for (byte alias : order) { if (alias == posBigTableAlias || joinValues[alias] == null) { continue; } @@ -183,7 +180,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { null); SerDeUtils.initializeSerDe(keySerde, null, keyTableDesc.getProperties(), null); MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerde, false); - for (Byte pos : order) { + for (byte pos : order) { if (pos == posBigTableAlias) { continue; } @@ -204,26 +201,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { return mapJoinTables; } - private static List[] getStandardObjectInspectors( - List[] aliasToObjectInspectors, int maxTag) { - @SuppressWarnings("unchecked") - List[] result = new List[maxTag]; - for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) { - List oiList = aliasToObjectInspectors[alias]; - if (oiList == null) { - continue; - } - ArrayList fieldOIList = new ArrayList(oiList.size()); - for (int i = 0; i < oiList.size(); i++) { - fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i), - ObjectInspectorCopyOption.WRITABLE)); - } - result[alias] = fieldOIList; - } - return result; - - } - /* * This operator only process small tables Read the key/value pairs Load them into hashtable */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 1aae142ba7..3b04b1e3dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -127,7 +127,7 @@ public static int populateJoinKeyValue(List[] outMap, public static int populateJoinKeyValue(List[] outMap, Map> inputMap, - Byte[] order, + byte[] order, int posBigTableAlias, Configuration conf) throws HiveException { int total = 0; for (Entry> e : inputMap.entrySet()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java index 9e7a6b2847..53fb022d18 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -90,7 +89,6 @@ private LongWritable skewjoinFollowupJobs; - private final boolean noOuterJoin; Configuration hconf = null; List dummyKey = null; String taskId; @@ -103,7 +101,6 @@ public SkewJoinHandler(CommonJoinOperator joinOp) { this.joinOp = joinOp; numAliases = joinOp.numAliases; conf = joinOp.getConf(); - noOuterJoin = joinOp.noOuterJoin; } public void initiliaze(Configuration hconf) { @@ -119,7 +116,7 @@ public void initiliaze(Configuration hconf) { int[][] filterMap = desc.getFilterMap(); for (int i = 0; i < numAliases; i++) { - Byte alias = conf.getTagOrder()[i]; + byte alias = conf.getTagOrder()[i]; List skewTableKeyInspectors = new ArrayList(); StructObjectInspector soi = (StructObjectInspector) joinOp.inputObjInspectors[alias]; StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 78ae9a18e8..e9d5503a38 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -73,7 +73,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()]; byte tag = conf.getTag(); inputOIs[tag] = inputObjInspectors[0]; - conf.setTagOrder(new Byte[]{ tag }); + conf.setTagOrder(new byte[] {tag}); numReplication = (short) hconf.getInt(MAPRED_FILE_REPLICATION, DEFAULT_REPLICATION); htsOperator.setConf(conf); htsOperator.initialize(hconf, inputOIs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index e80a3e20e3..3e0139a1cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -118,8 +118,8 @@ public VectorizationContext getInputVectorizationContext() { List keyDesc = desc.getKeys().get(posBigTable); List bigTableExprs = desc.getExprs().get(posBigTable); - Byte[] order = desc.getTagOrder(); - Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); + byte[] order = desc.getTagOrder(); + byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); final int outputColumnCount = desc.getOutputColumnNames().size(); TypeInfo[] outputTypeInfos = new TypeInfo[outputColumnCount]; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index e733b70066..c4b11aef68 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -318,17 +318,17 @@ private boolean validateSMBJoinKeys(SMBJoinDesc smbJoinDesc, // the following arrays are created // [0, 0, 0, 1] --> [T1, T1, T1, T2] (table mapping) // [0, 1, 2, 0] --> [T1.0, T1.1, T1.2, T2.0] (table columns mapping) - Byte[] tagOrder = smbJoinDesc.getTagOrder(); + byte[] tagOrder = smbJoinDesc.getTagOrder(); Map> retainList = smbJoinDesc.getRetainList(); int totalNumberColumns = 0; - for (Byte tag : tagOrder) { + for (byte tag : tagOrder) { totalNumberColumns += retainList.get(tag).size(); } byte[] columnTableMappings = new byte[totalNumberColumns]; int[] columnNumberMappings = new int[totalNumberColumns]; int currentColumnPosition = 0; - for (Byte tag : tagOrder) { + for (byte tag : tagOrder) { for (int pos = 0; pos < retainList.get(tag).size(); pos++) { columnTableMappings[currentColumnPosition] = tag; columnNumberMappings[currentColumnPosition] = pos; @@ -459,7 +459,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List sourceTableSortCols = new ArrayList(); op = op.getParentOperators().get(0); - boolean isSrcMmTable = false; while (true) { if (!(op instanceof TableScanOperator) && !(op instanceof FilterOperator) && diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java index c5553fbdd0..1f58b4f72c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/FixedBucketPruningOptimizer.java @@ -37,11 +37,9 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.PrunerOperatorFactory.FilterPruner; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -262,52 +260,15 @@ private Object convertLiteral(Object o) { } } - public final class FixedBucketPruningOptimizerCtxt implements - NodeProcessorCtx { - public final ParseContext pctx; + private static final class FixedBucketPruningOptimizerCtxt implements NodeProcessorCtx { + private final ParseContext pctx; private final boolean compat; - private int numBuckets; - private PrunedPartitionList partitions; - private List bucketCols; - private List schema; - public FixedBucketPruningOptimizerCtxt(boolean compat, ParseContext pctx) { + FixedBucketPruningOptimizerCtxt(boolean compat, ParseContext pctx) { this.compat = compat; this.pctx = pctx; } - public void setSchema(ArrayList fields) { - this.schema = fields; - } - - public List getSchema() { - return this.schema; - } - - public void setBucketCols(List bucketCols) { - this.bucketCols = bucketCols; - } - - public List getBucketCols() { - return this.bucketCols; - } - - public void setPartitions(PrunedPartitionList partitions) { - this.partitions = partitions; - } - - public PrunedPartitionList getPartitions() { - return this.partitions; - } - - public int getNumBuckets() { - return numBuckets; - } - - public void setNumBuckets(int numBuckets) { - this.numBuckets = numBuckets; - } - // compatibility mode enabled public boolean isCompat() { return this.compat; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index b39f1d3999..208b28c761 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -152,8 +152,8 @@ private void reorder(JoinOperator joinOp, Set bigTables) { // Reorder tags if need be if (biggestPos != (count - 1)) { - Byte[] tagOrder = joinOp.getConf().getTagOrder(); - Byte temp = tagOrder[biggestPos]; + byte[] tagOrder = joinOp.getConf().getTagOrder(); + byte temp = tagOrder[biggestPos]; tagOrder[biggestPos] = tagOrder[count - 1]; tagOrder[count - 1] = temp; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 5ed43c7996..321577902f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -388,7 +388,7 @@ private static boolean checkFullOuterMapJoinCompatible(HiveConf hiveConf, } // Check for supported key data types. - Byte[] order = joinDesc.getTagOrder(); + byte[] order = joinDesc.getTagOrder(); ExprNodeDesc[][] joinKeysArray = joinDesc.getJoinKeys(); for (int i = 0; i < order.length; i++) { byte pos = order[i]; @@ -1273,7 +1273,7 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, int mapJoinPos, boolean noCheckOuterJoin, boolean adjustParentsChildren) throws SemanticException { JoinDesc desc = op.getConf(); JoinCondDesc[] condns = desc.getConds(); - Byte[] tagOrder = desc.getTagOrder(); + byte[] tagOrder = desc.getTagOrder(); // outer join cannot be performed on a table which is being cached if (!noCheckOuterJoin) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index 9377fd282c..04551b8204 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -169,8 +169,7 @@ private void findPossibleAutoConvertedJoinOperators() throws SemanticException { } JoinDesc joinDesc = joinOp.getConf(); - Byte[] order = joinDesc.getTagOrder(); - int numAliases = order.length; + int numAliases = joinDesc.getTagOrder().length; Set bigTableCandidates = MapJoinProcessor.getBigTableCandidates(joinDesc.getConds()); if (bigTableCandidates.isEmpty()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java index 01fb734f73..3c0b6c2cfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java @@ -139,7 +139,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, JoinOperator jop = (JoinOperator)nd; JoinDesc joinDesc = jop.getConf(); - Byte[] order = joinDesc.getTagOrder(); + byte[] order = joinDesc.getTagOrder(); Map> expressions = joinDesc.getExprs(); List outputValNames = joinDesc.getOutputColumnNames(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index f7cedfe3be..8cfd80462a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -128,14 +128,14 @@ public static void processSkewJoin(JoinOperator joinOp, Map bigKeysDirMap = new HashMap(); Map> smallKeysDirMap = new HashMap>(); Map skewJoinJobResultsDir = new HashMap(); - Byte[] tags = joinDescriptor.getTagOrder(); + byte[] tags = joinDescriptor.getTagOrder(); for (int i = 0; i < numAliases; i++) { - Byte alias = tags[i]; + byte alias = tags[i]; bigKeysDirMap.put(alias, getBigKeysDir(baseTmpDir, alias)); Map smallKeysMap = new HashMap(); smallKeysDirMap.put(alias, smallKeysMap); - for (Byte src2 : tags) { - if (!src2.equals(alias)) { + for (byte src2 : tags) { + if (src2 != alias) { smallKeysMap.put(src2, getSmallKeysDir(baseTmpDir, alias, src2)); } } @@ -168,12 +168,12 @@ public static void processSkewJoin(JoinOperator joinOp, // used for create mapJoinDesc, should be in order List newJoinValueTblDesc = new ArrayList(); - for (Byte tag : tags) { + for (byte tag : tags) { newJoinValueTblDesc.add(null); } for (int i = 0; i < numAliases; i++) { - Byte alias = tags[i]; + byte alias = tags[i]; List valueCols = joinValues.get(alias); String colNames = ""; String colTypes = ""; @@ -186,7 +186,7 @@ public static void processSkewJoin(JoinOperator joinOp, for (int k = 0; k < columnSize; k++) { TypeInfo type = valueCols.get(k).getTypeInfo(); String newColName = i + "_VALUE_" + k; // any name, it does not matter. - ColumnInfo columnInfo = new ColumnInfo(newColName, type, alias.toString(), false); + ColumnInfo columnInfo = new ColumnInfo(newColName, type, Byte.toString(alias), false); columnInfos.add(columnInfo); newValueExpr.add(new ExprNodeColumnDesc(columnInfo)); if (!first) { @@ -208,7 +208,7 @@ public static void processSkewJoin(JoinOperator joinOp, colNames = colNames + joinKeys.get(k); colTypes = colTypes + joinKeyTypes.get(k); ColumnInfo columnInfo = new ColumnInfo(joinKeys.get(k), TypeInfoFactory - .getPrimitiveTypeInfo(joinKeyTypes.get(k)), alias.toString(), false); + .getPrimitiveTypeInfo(joinKeyTypes.get(k)), Byte.toString(alias), false); columnInfos.add(columnInfo); newKeyExpr.add(new ExprNodeColumnDesc(columnInfo)); } @@ -240,7 +240,7 @@ public static void processSkewJoin(JoinOperator joinOp, joinDescriptor.setKeyTableDesc(keyTblDesc); for (int i = 0; i < numAliases - 1; i++) { - Byte src = tags[i]; + byte src = tags[i]; MapWork newPlan = PlanUtils.getMapRedWork().getMapWork(); // This code has been only added for testing @@ -262,7 +262,7 @@ public static void processSkewJoin(JoinOperator joinOp, Operator tblScan_op = parentOps[i]; ArrayList aliases = new ArrayList(); - String alias = src.toString().intern(); + String alias = Byte.toString(src).intern(); aliases.add(alias); Path bigKeyDirPath = bigKeysDirMap.get(src); newPlan.addPathToAlias(bigKeyDirPath, aliases); @@ -297,12 +297,12 @@ public static void processSkewJoin(JoinOperator joinOp, if (j == i) { continue; } - Byte small_alias = tags[j]; + byte smallAlias = tags[j]; Operator tblScan_op2 = parentOps[j]; - localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2); - Path tblDir = smallTblDirs.get(small_alias); - localPlan.getAliasToFetchWork().put(small_alias.toString(), - new FetchWork(tblDir, tableDescList.get(small_alias))); + localPlan.getAliasToWork().put(Byte.toString(smallAlias), tblScan_op2); + Path tblDir = smallTblDirs.get(smallAlias); + localPlan.getAliasToFetchWork().put(Byte.toString(smallAlias), + new FetchWork(tblDir, tableDescList.get(smallAlias))); } newPlan.setMapRedLocalWork(localPlan); @@ -375,7 +375,7 @@ public static boolean skewJoinEnabled(HiveConf conf, JoinOperator joinOp) { } byte pos = 0; - for (Byte tag : joinOp.getConf().getTagOrder()) { + for (byte tag : joinOp.getConf().getTagOrder()) { if (tag != pos) { return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 7f7f49ba8c..6b5dee5ba3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -81,7 +81,7 @@ private GenSparkSkewJoinProcessor() { // prevent instantiation } - @SuppressWarnings("unchecked") + // @formatter:off public static void processSkewJoin(JoinOperator joinOp, Task currTask, ReduceWork reduceWork, ParseContext parseCtx) throws SemanticException { @@ -102,15 +102,15 @@ public static void processSkewJoin(JoinOperator joinOp, Task bigKeysDirMap = new HashMap(); Map> smallKeysDirMap = new HashMap>(); Map skewJoinJobResultsDir = new HashMap(); - Byte[] tags = joinDescriptor.getTagOrder(); + byte[] tags = joinDescriptor.getTagOrder(); // for each joining table, set dir for big key and small keys properly for (int i = 0; i < numAliases; i++) { - Byte alias = tags[i]; + byte alias = tags[i]; bigKeysDirMap.put(alias, GenMRSkewJoinProcessor.getBigKeysDir(baseTmpDir, alias)); Map smallKeysMap = new HashMap(); smallKeysDirMap.put(alias, smallKeysMap); - for (Byte src2 : tags) { - if (!src2.equals(alias)) { + for (byte src2 : tags) { + if (src2 != alias) { smallKeysMap.put(src2, GenMRSkewJoinProcessor.getSmallKeysDir(baseTmpDir, alias, src2)); } } @@ -143,7 +143,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task valueCols = joinValues.get(alias); String colNames = ""; String colTypes = ""; @@ -156,10 +156,9 @@ public static void processSkewJoin(JoinOperator joinOp, Task listWorks = new ArrayList(); List> listTasks = new ArrayList>(); for (int i = 0; i < numAliases - 1; i++) { - Byte src = tags[i]; + byte src = tags[i]; HiveConf hiveConf = new HiveConf(parseCtx.getConf(), GenSparkSkewJoinProcessor.class); SparkWork sparkWork = new SparkWork(parseCtx.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID)); @@ -281,7 +279,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task tableScan = parentOps[j]; - String alias = tags[j].toString(); + String alias = Byte.toString(tags[j]); ArrayList aliases = new ArrayList(); aliases.add(alias); Path path; @@ -353,11 +351,11 @@ public static void processSkewJoin(JoinOperator joinOp, Task>()); currTask.addDependentTask(cndTsk); } + // @formatter:on /** * Insert SparkHashTableSink and HashTableDummy between small dir TS and MJ. */ - @SuppressWarnings("unchecked") private static void insertSHTS(byte tag, TableScanOperator tableScan, MapWork bigMapWork) { Preconditions.checkArgument(tableScan.getChildOperators().size() == 1 && tableScan.getChildOperators().get(0) instanceof MapJoinOperator); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index ebf17085fc..1c8f7b002f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -263,8 +263,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // generate a map join task for the big table SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf(); - Byte[] order = originalSMBJoinDesc.getTagOrder(); - int numAliases = order.length; + int numAliases = originalSMBJoinDesc.getTagOrder().length; Set bigTableCandidates = MapJoinProcessor.getBigTableCandidates(originalSMBJoinDesc.getConds()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index b650299a9a..1171eda63f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -2582,8 +2582,8 @@ private boolean validateMapJoinDesc(MapJoinDesc desc) { if (!validateExprNodeDesc(valueExprs, "Value")) { return false; } - Byte[] order = desc.getTagOrder(); - Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); + byte[] order = desc.getTagOrder(); + byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); List smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable); if (!validateExprNodeDesc(smallTableExprs, "Small Table")) { return false; @@ -3265,9 +3265,9 @@ private void fixupParentChildOperators(Operator op, } private boolean isBigTableOnlyResults(MapJoinDesc desc) { - Byte[] order = desc.getTagOrder(); + byte[] order = desc.getTagOrder(); byte posBigTable = (byte) desc.getPosBigTable(); - Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); + byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); int[] smallTableIndices; int smallTableIndicesSize; @@ -3663,8 +3663,8 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi VectorColumnSourceMapping smallTableValueMapping = new VectorColumnSourceMapping("Small Table Value Mapping"); - Byte[] order = desc.getTagOrder(); - Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); + byte[] order = desc.getTagOrder(); + byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); boolean isOuterJoin = !desc.getNoOuterJoin(); /* @@ -3742,7 +3742,6 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi // Small table indices has more information (i.e. keys) than retain, so use it if it exists... if (smallTableIndicesSize > 0) { - for (int i = 0; i < smallTableIndicesSize; i++) { if (smallTableIndices[i] >= 0) { // Zero and above numbers indicate a big table key is needed for diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java index d71ba5b685..e4784eb425 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java @@ -66,7 +66,7 @@ protected JoinCondDesc[] conds; - protected Byte[] tagOrder; + private byte[] tagOrder; private TableDesc keyTableDesc; @@ -259,12 +259,12 @@ public void setConds(JoinCondDesc[] conds) { } @Override - public Byte[] getTagOrder() { + public byte[] getTagOrder() { return tagOrder; } @Override - public void setTagOrder(Byte[] tagOrder) { + public void setTagOrder(byte[] tagOrder) { this.tagOrder = tagOrder; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index 2c93c2a760..3e55ae1f61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -88,7 +88,7 @@ protected JoinCondDesc[] conds; - protected Byte[] tagOrder; + private byte[] tagOrder; private TableDesc keyTableDesc; // this operator cannot be converted to mapjoin cause output is expected to be sorted on join key @@ -135,7 +135,7 @@ public JoinDesc(final Map> exprs, // called by late-MapJoin processor (hive.auto.convert.join=true for example) public void resetOrder() { - tagOrder = new Byte[exprs.size()]; + tagOrder = new byte[exprs.size()]; for (int i = 0; i < tagOrder.length; i++) { tagOrder[i] = (byte) i; } @@ -415,7 +415,7 @@ public void setConds(final JoinCondDesc[] conds) { * * @return Array of tags */ - public Byte[] getTagOrder() { + public byte[] getTagOrder() { return tagOrder; } @@ -425,7 +425,7 @@ public void setConds(final JoinCondDesc[] conds) { * @param tagOrder * Array of tags */ - public void setTagOrder(Byte[] tagOrder) { + public void setTagOrder(byte[] tagOrder) { this.tagOrder = tagOrder; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 093a62920d..e5fac29e35 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -28,13 +28,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; @@ -340,7 +338,7 @@ public String getDebugValueContext() { try { boolean noOuterJoin = getNoOuterJoin(); // Order in which the results should be output. - Byte[] order = getTagOrder(); + byte[] order = getTagOrder(); int[][] filterMaps = getFilterMap(); for (int pos = 0; pos < order.length; pos++) { @@ -749,41 +747,4 @@ public MapJoinOperatorExplainVectorization getMapJoinVectorization() { return new MapJoinOperatorExplainVectorization(this, vectorMapJoinDesc); } - public class SMBJoinOperatorExplainVectorization extends OperatorExplainVectorization { - - private final SMBJoinDesc smbJoinDesc; - private final VectorSMBJoinDesc vectorSMBJoinDesc; - - public SMBJoinOperatorExplainVectorization(SMBJoinDesc smbJoinDesc, - VectorSMBJoinDesc vectorSMBJoinDesc) { - // Native vectorization NOT supported. - super(vectorSMBJoinDesc, false); - this.smbJoinDesc = smbJoinDesc; - this.vectorSMBJoinDesc = vectorSMBJoinDesc; - } - } - - // Handle dual nature. - @Explain(vectorization = Vectorization.OPERATOR, displayName = "SMB Map Join Vectorization", - explainLevels = { Level.DEFAULT, Level.EXTENDED }) - public SMBJoinOperatorExplainVectorization getSMBJoinVectorization() { - VectorSMBJoinDesc vectorSMBJoinDesc = (VectorSMBJoinDesc) getVectorDesc(); - if (vectorSMBJoinDesc == null || !(this instanceof SMBJoinDesc)) { - return null; - } - return new SMBJoinOperatorExplainVectorization((SMBJoinDesc) this, vectorSMBJoinDesc); - } - - @Override - public boolean isSame(OperatorDesc other) { - if (super.isSame(other)) { - MapJoinDesc otherDesc = (MapJoinDesc) other; - return Objects.equals(getParentToInput(), otherDesc.getParentToInput()) && - Objects.equals(getKeyCountsExplainDesc(), otherDesc.getKeyCountsExplainDesc()) && - getPosBigTable() == otherDesc.getPosBigTable() && - isBucketMapJoin() == otherDesc.isBucketMapJoin(); - } - return false; - } - } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java index d127342061..63f88d8b41 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java @@ -125,6 +125,7 @@ public RowTestObjectsMultiSet getTestRowMultiSet() { return testRowMultiSet; } + @Override public void nextTestRow(RowTestObjects testRow) { testRowMultiSet.add(testRow, RowTestObjectsMultiSet.RowFlag.NONE); } @@ -159,6 +160,7 @@ public TestMultiSetVectorCollectorOperator( this.testRowMultiSet = testRowMultiSet; } + @Override public void nextTestRow(RowTestObjects testRow) { testRowMultiSet.add(testRow, RowTestObjectsMultiSet.RowFlag.NONE); } @@ -241,7 +243,7 @@ public static MapJoinDesc createMapJoinDesc(MapJoinTestDescription testDesc, mapJoinDesc.setKeys(keyMap); mapJoinDesc.setExprs(exprMap); - Byte[] order = new Byte[] {(byte) 0, (byte) 1}; + byte[] order = new byte[] {(byte) 0, (byte) 1}; mapJoinDesc.setTagOrder(order); mapJoinDesc.setNoOuterJoin( testDesc.vectorMapJoinVariation != VectorMapJoinVariation.OUTER && @@ -680,7 +682,7 @@ public static MapJoinTableContainerSerDe createMapJoinTableContainerSerDe(MapJoi final Byte smallTablePos = 1; TableDesc keyTableDesc = mapJoinDesc.getKeyTblDesc(); - AbstractSerDe keySerializer = (AbstractSerDe) ReflectionUtil.newInstance( + AbstractSerDe keySerializer = ReflectionUtil.newInstance( BinarySortableSerDe.class, null); SerDeUtils.initializeSerDe(keySerializer, null, keyTableDesc.getProperties(), null); MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false); @@ -768,7 +770,7 @@ private static void loadTableContainerData(MapJoinTestDescription testDesc, MapJ Writable keyWritable = (Writable) smallTableKey[index]; VerifyFastRow.serializeWrite( - keySerializeWrite, (PrimitiveTypeInfo) testDesc.bigTableKeyTypeInfos[index], keyWritable); + keySerializeWrite, testDesc.bigTableKeyTypeInfos[index], keyWritable); } keyBytesWritable.set(keyOutput.getData(), 0, keyOutput.getLength()); @@ -785,7 +787,7 @@ private static void loadTableContainerData(MapJoinTestDescription testDesc, MapJ Writable valueWritable = (Writable) smallTableValue[index]; VerifyFastRow.serializeWrite( - valueSerializeWrite, (PrimitiveTypeInfo) testDesc.smallTableValueTypeInfos[index], valueWritable); + valueSerializeWrite, testDesc.smallTableValueTypeInfos[index], valueWritable); } valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); mapJoinTableContainer.putRow(keyBytesWritable, valueBytesWritable); @@ -1029,7 +1031,7 @@ public static CreateMapJoinResult createMapJoinImplementation( MapJoinOperator mapJoinOperator, int bigTableKeySize, int bigTableRetainSize, String[] outputColumnNames, TypeInfo[] outputTypeInfos) { - MapJoinDesc mapJoinDesc = (MapJoinDesc) mapJoinOperator.getConf(); + MapJoinDesc mapJoinDesc = mapJoinOperator.getConf(); List selectExprList = new ArrayList(); List selectOutputColumnNameList = new ArrayList(); @@ -1069,12 +1071,12 @@ public static CreateMapJoinResult createMapJoinImplementation( MapJoinOperator mapJoinOperator, int bigTableKeySize, int bigTableRetainSize, Operator selectOperator) throws HiveException{ - MapJoinDesc mapJoinDesc = (MapJoinDesc) mapJoinOperator.getConf(); + MapJoinDesc mapJoinDesc = mapJoinOperator.getConf(); VectorizationContext vOutContext = ((VectorizationContextRegion) mapJoinOperator).getOutputVectorizationContext(); - SelectDesc selectDesc = (SelectDesc) selectOperator.getConf(); + SelectDesc selectDesc = selectOperator.getConf(); List selectExprs = selectDesc.getColList(); VectorExpression[] selectVectorExpr = new VectorExpression[bigTableRetainSize]; @@ -1133,7 +1135,7 @@ public static CountCollectorTestOperator addFullOuterIntercept( MapJoinTableContainerSerDe mapJoinTableContainerSerDe) throws SerDeException, IOException, HiveException { - MapJoinDesc mapJoinDesc = (MapJoinDesc) mapJoinOperator.getConf(); + MapJoinDesc mapJoinDesc = mapJoinOperator.getConf(); // For FULL OUTER MapJoin, we require all Big Keys to be present in the output result. // The first N output columns are the Big Table key columns. @@ -1163,7 +1165,7 @@ public static CountCollectorTestOperator addFullOuterIntercept( mapJoinOutputColumnNames, mapJoinOutputTypeInfos); List selectOutputColumnNameList = - ((SelectDesc) selectOperator.getConf()).getOutputColumnNames(); + selectOperator.getConf().getOutputColumnNames(); String[] selectOutputColumnNames = selectOutputColumnNameList.toArray(new String[0]); diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java index e0d292c7be..4e265e56d4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java @@ -179,34 +179,34 @@ public void testValidateNestedExpressions() { * prepareAbstractMapJoin prepares a join operator descriptor, used as helper by SMB and Map join tests. */ private void prepareAbstractMapJoin(AbstractMapJoinOperator map, MapJoinDesc mjdesc) { - mjdesc.setPosBigTable(0); - List expr = new ArrayList(); - expr.add(new ExprNodeColumnDesc(Integer.class, "col1", "T", false)); - Map> keyMap = new HashMap>(); - keyMap.put((byte)0, expr); - List smallTableExpr = new ArrayList(); - smallTableExpr.add(new ExprNodeColumnDesc(Integer.class, "col2", "T1", false)); - keyMap.put((byte)1, smallTableExpr); - mjdesc.setKeys(keyMap); - mjdesc.setExprs(keyMap); - Byte[] order = new Byte[] {(byte) 0, (byte) 1}; - mjdesc.setTagOrder(order); - - //Set filter expression - GenericUDFOPEqual udf = new GenericUDFOPEqual(); - ExprNodeGenericFuncDesc equalExprDesc = new ExprNodeGenericFuncDesc(); - equalExprDesc.setTypeInfo(TypeInfoFactory.booleanTypeInfo); - equalExprDesc.setGenericUDF(udf); - List children1 = new ArrayList(2); - children1.add(new ExprNodeColumnDesc(Integer.class, "col2", "T1", false)); - children1.add(new ExprNodeColumnDesc(Integer.class, "col3", "T2", false)); - equalExprDesc.setChildren(children1); - List filterExpr = new ArrayList(); - filterExpr.add(equalExprDesc); - Map> filterMap = new HashMap>(); - filterMap.put((byte) 0, expr); - mjdesc.setFilters(filterMap); - } + mjdesc.setPosBigTable(0); + List expr = new ArrayList(); + expr.add(new ExprNodeColumnDesc(Integer.class, "col1", "T", false)); + Map> keyMap = new HashMap>(); + keyMap.put((byte) 0, expr); + List smallTableExpr = new ArrayList(); + smallTableExpr.add(new ExprNodeColumnDesc(Integer.class, "col2", "T1", false)); + keyMap.put((byte) 1, smallTableExpr); + mjdesc.setKeys(keyMap); + mjdesc.setExprs(keyMap); + byte[] order = new byte[] {(byte) 0, (byte) 1}; + mjdesc.setTagOrder(order); + + //Set filter expression + GenericUDFOPEqual udf = new GenericUDFOPEqual(); + ExprNodeGenericFuncDesc equalExprDesc = new ExprNodeGenericFuncDesc(); + equalExprDesc.setTypeInfo(TypeInfoFactory.booleanTypeInfo); + equalExprDesc.setGenericUDF(udf); + List children1 = new ArrayList(2); + children1.add(new ExprNodeColumnDesc(Integer.class, "col2", "T1", false)); + children1.add(new ExprNodeColumnDesc(Integer.class, "col3", "T2", false)); + equalExprDesc.setChildren(children1); + List filterExpr = new ArrayList(); + filterExpr.add(equalExprDesc); + Map> filterMap = new HashMap>(); + filterMap.put((byte) 0, expr); + mjdesc.setFilters(filterMap); + } /** * testValidateMapJoinOperator validates that the Map join operator can be vectorized.