diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 975e108..d1b3e9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -429,20 +429,12 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } } - public static Map> getAllScratchColumnVectorTypeMaps(Configuration hiveConf) { - BaseWork baseWork = getMapWork(hiveConf); - if (baseWork == null) { - baseWork = getReduceWork(hiveConf); + public static Map> getMapWorkAllScratchColumnVectorTypeMaps(Configuration hiveConf) throws HiveException { + MapWork mapWork = getMapWork(hiveConf); + if (mapWork == null) { + throw new HiveException("Only valid in MapWork"); } - return baseWork.getAllScratchColumnVectorTypeMaps(); - } - - public static Map> getAllColumnVectorMaps(Configuration hiveConf) { - BaseWork baseWork = getMapWork(hiveConf); - if (baseWork == null) { - baseWork = getReduceWork(hiveConf); - } - return baseWork.getAllColumnVectorMaps(); + return getMapWork(hiveConf).getAllScratchColumnVectorTypeMaps(); } public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 856ff20..90b4b12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -76,8 +76,6 @@ // Create a new outgoing vectorization context because column name map will change. private VectorizationContext vOutContext = null; - private String fileKey; - // The above members are initialized by the constructor and must not be // transient. //--------------------------------------------------------------------------- @@ -756,7 +754,6 @@ public VectorGroupByOperator(VectorizationContext vContext, OperatorDesc conf) vOutContext = new VectorizationContext(desc.getOutputColumnNames()); vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_"); - fileKey = vOutContext.getFileKey(); } public VectorGroupByOperator() { @@ -796,7 +793,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { outputFieldNames, objectInspectors); if (isVectorOutput) { vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector); + vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector); outputBatch = vrbCtx.createVectorizedRowBatch(); vectorColumnAssign = VectorColumnAssignFactory.buildAssigners( outputBatch, outputObjInspector, vOutContext.getProjectionColumnMap(), conf.getOutputColumnNames()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 5fda5dd..2c8aee1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -53,19 +52,16 @@ */ private static final long serialVersionUID = 1L; - /** - * Vectorizaiton context key - * Used to retrieve column map from the MapTask scratch - */ - private String fileKey; - private int tagLen; - private VectorExpression[] keyExpressions; - private transient VectorHashKeyWrapperBatch keyWrapperBatch; - private transient VectorExpressionWriter[] keyOutputWriters; private VectorExpression[] bigTableFilterExpressions; private VectorExpression[] bigTableValueExpressions; + + private VectorizationContext vOutContext; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- private transient VectorizedRowBatch outputBatch; private transient VectorExpressionWriter[] valueWriters; @@ -76,8 +72,9 @@ // private transient int batchIndex; private transient VectorHashKeyWrapper[] keyValues; - - private transient VectorizationContext vOutContext = null; + private transient VectorHashKeyWrapperBatch keyWrapperBatch; + private transient VectorExpressionWriter[] keyOutputWriters; + private transient VectorizedRowBatchCtx vrbCtx = null; public VectorMapJoinOperator() { @@ -96,7 +93,6 @@ public VectorMapJoinOperator (VectorizationContext vContext, OperatorDesc conf) numAliases = desc.getExprs().size(); posBigTable = (byte) desc.getPosBigTable(); filterMaps = desc.getFilterMap(); - tagLen = desc.getTagLength(); noOuterJoin = desc.isNoOuterJoin(); Map> filterExpressions = desc.getFilters(); @@ -113,7 +109,6 @@ public VectorMapJoinOperator (VectorizationContext vContext, OperatorDesc conf) // We are making a new output vectorized row batch. vOutContext = new VectorizationContext(desc.getOutputColumnNames()); vOutContext.setFileKey(vContext.getFileKey() + "/MAP_JOIN_" + desc.getBigTableAlias()); - this.fileKey = vOutContext.getFileKey(); } @Override @@ -124,7 +119,7 @@ public void initializeOp(Configuration hconf) throws HiveException { keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(hconf, this.fileKey, (StructObjectInspector) this.outputObjInspector); + vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); outputBatch = vrbCtx.createVectorizedRowBatch(); @@ -193,10 +188,8 @@ protected void internalForward(Object row, ObjectInspector outputOI) throws Hive Object[] values = (Object[]) row; VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI); if (null == vcas) { - Map> allColumnMaps = Utilities.getAllColumnVectorMaps(hconf); - Map columnMap = allColumnMaps.get(fileKey); vcas = VectorColumnAssignFactory.buildAssigners( - outputBatch, outputOI, columnMap, conf.getOutputColumnNames()); + outputBatch, outputOI, vOutContext.getProjectionColumnMap(), conf.getOutputColumnNames()); outputVectorAssigners.put(outputOI, vcas); } for (int i=0; i outputVectorAssigners; @@ -98,7 +99,6 @@ public VectorSMBMapJoinOperator(VectorizationContext vContext, OperatorDesc conf numAliases = desc.getExprs().size(); posBigTable = (byte) desc.getPosBigTable(); filterMaps = desc.getFilterMap(); - tagLen = desc.getTagLength(); noOuterJoin = desc.isNoOuterJoin(); // Must obtain vectorized equivalents for filter and value expressions @@ -117,7 +117,6 @@ public VectorSMBMapJoinOperator(VectorizationContext vContext, OperatorDesc conf // We are making a new output vectorized row batch. vOutContext = new VectorizationContext(desc.getOutputColumnNames()); vOutContext.setFileKey(vContext.getFileKey() + "/SMB_JOIN_" + desc.getBigTableAlias()); - this.fileKey = vOutContext.getFileKey(); } @Override @@ -135,7 +134,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(hconf, this.fileKey, (StructObjectInspector) this.outputObjInspector); + vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); outputBatch = vrbCtx.createVectorizedRowBatch(); @@ -272,10 +271,8 @@ protected void internalForward(Object row, ObjectInspector outputOI) throws Hive Object[] values = (Object[]) row; VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI); if (null == vcas) { - Map> allColumnMaps = Utilities.getAllColumnVectorMaps(hconf); - Map columnMap = allColumnMaps.get(fileKey); vcas = VectorColumnAssignFactory.buildAssigners( - outputBatch, outputOI, columnMap, conf.getOutputColumnNames()); + outputBatch, outputOI, vOutContext.getProjectionColumnMap(), conf.getOutputColumnNames()); outputVectorAssigners.put(outputOI, vcas); } for (int i = 0; i < values.length; ++i) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 4f57aac..f3d6a27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -96,7 +96,7 @@ // list does not contain partition columns private List colsToInclude; - private Map columnTypeMap = null; + private Map scratchColumnTypeMap = null; /** * Constructor for VectorizedRowBatchCtx @@ -126,36 +126,17 @@ public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspect public VectorizedRowBatchCtx() { } - - /** - * Initializes the VectorizedRowBatch context based on an arbitrary object inspector - * Used by non-tablescan operators when they change the vectorization context - * @param hiveConf - * @param fileKey - * The key on which to retrieve the extra column mapping from the map/reduce scratch - * @param rowOI - * Object inspector that shapes the column types - */ - public void init(Configuration hiveConf, String fileKey, - StructObjectInspector rowOI) { - Map> scratchColumnVectorTypes = - Utilities.getAllScratchColumnVectorTypeMaps(hiveConf); - columnTypeMap = scratchColumnVectorTypes.get(fileKey); - this.rowOI= rowOI; - this.rawRowOI = rowOI; - } - /** * Initializes the VectorizedRowBatch context based on an scratch column type map and * object inspector. - * @param columnTypeMap + * @param scratchColumnTypeMap * @param rowOI * Object inspector that shapes the column types */ - public void init(Map columnTypeMap, + public void init(Map scratchColumnTypeMap, StructObjectInspector rowOI) { - this.columnTypeMap = columnTypeMap; + this.scratchColumnTypeMap = scratchColumnTypeMap; this.rowOI= rowOI; this.rawRowOI = rowOI; } @@ -179,7 +160,8 @@ public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx IOException, SerDeException, InstantiationException, - IllegalAccessException, HiveException { + IllegalAccessException, + HiveException { Map pathToPartitionInfo = Utilities .getMapRedWork(hiveConf).getMapWork().getPathToPartitionInfo(); @@ -189,8 +171,8 @@ public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx split.getPath(), IOPrepareCache.get().getPartitionDescMap()); String partitionPath = split.getPath().getParent().toString(); - columnTypeMap = Utilities - .getAllScratchColumnVectorTypeMaps(hiveConf) + scratchColumnTypeMap = Utilities + .getMapWorkAllScratchColumnVectorTypeMaps(hiveConf) .get(partitionPath); Properties partProps = @@ -613,12 +595,12 @@ public final boolean isPartitionCol(int colnum) { } private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException { - if (columnTypeMap != null && !columnTypeMap.isEmpty()) { + if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) { int origNumCols = vrb.numCols; - int newNumCols = vrb.cols.length+columnTypeMap.keySet().size(); + int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size(); vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); for (int i = origNumCols; i < newNumCols; i++) { - String typeName = columnTypeMap.get(i); + String typeName = scratchColumnTypeMap.get(i); if (typeName == null) { throw new HiveException("No type found for column type entry " + i); }