diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5f92d11..537e7b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -469,9 +469,9 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } } - public static Map getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) { + public static String[] getMapWorkVectorScratchColumnTypeNames(Configuration hiveConf) { MapWork mapWork = getMapWork(hiveConf); - return mapWork.getVectorScratchColumnTypeMap(); + return mapWork.getVectorScratchColumnTypeNames(); } public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 545d7c6..7e5c2cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -232,7 +232,7 @@ private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector sources[tag] = new ReduceRecordSource(); sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorScratchColumnTypeMap()); + redWork.getVectorScratchColumnTypeNames()); ois[tag] = sources[tag].getObjectInspector(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index cdabe3a..9e22258 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -122,7 +122,7 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag, - Map vectorScratchColumnTypeMap) + String[] vectorScratchColumnTypeNames) throws Exception { ObjectInspector keyObjectInspector; @@ -187,7 +187,7 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); batchContext = new VectorizedRowBatchCtx(); - batchContext.init(vectorScratchColumnTypeMap, (StructObjectInspector) rowObjectInspector); + batchContext.init((StructObjectInspector) rowObjectInspector, vectorScratchColumnTypeNames); batch = batchContext.createVectorizedRowBatch(); // Setup vectorized deserialization for the key and value. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 39a83e3..fa66964 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -814,7 +814,7 @@ public VectorGroupByOperator() { outputFieldNames, objectInspectors); if (isVectorOutput) { vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector); + vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); vectorAssignRowSameBatch = new VectorAssignRowSameBatch(); vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index 0baec2c..d9f5d2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -91,7 +91,7 @@ public VectorMapJoinBaseOperator (VectorizationContext vContext, OperatorDesc co Collection> result = super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index e9bd44a..946b035 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -123,6 +123,11 @@ public void assign(VectorExpressionWriter[] writers, List keyDesc = conf.getKeys().get(posBigTable); keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc); + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); + + outputBatch = vrbCtx.createVectorizedRowBatch(); + keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); Map> valueExpressions = conf.getExprs(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index a2f8091..ce6a121 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -135,7 +135,7 @@ public VectorSMBMapJoinOperator(VectorizationContext vContext, OperatorDesc conf Collection> result = super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 392e56d..2e8b971 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -133,6 +133,8 @@ VectorExpressionDescriptor vMap; + private List initialColumnNames; + private List projectedColumns; private List projectionColumnNames; private Map projectionColumnMap; @@ -147,6 +149,7 @@ public VectorizationContext(String contextName, List initialColumnNames) this.contextName = contextName; level = 0; LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level + " initialColumnNames " + initialColumnNames.toString()); + this.initialColumnNames = initialColumnNames; this.projectionColumnNames = initialColumnNames; projectedColumns = new ArrayList(); @@ -167,7 +170,8 @@ public VectorizationContext(String contextName) { this.contextName = contextName; level = 0; LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level); - projectedColumns = new ArrayList(); + initialColumnNames = new ArrayList(); + projectedColumns = new ArrayList(); projectionColumnNames = new ArrayList(); projectionColumnMap = new HashMap(); this.ocm = new OutputColumnManager(0); @@ -182,6 +186,7 @@ public VectorizationContext(String contextName, VectorizationContext vContext) { this.contextName = contextName; level = vContext.level + 1; LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level); + this.initialColumnNames = vContext.initialColumnNames; this.projectedColumns = new ArrayList(); this.projectionColumnNames = new ArrayList(); this.projectionColumnMap = new HashMap(); @@ -194,6 +199,7 @@ public VectorizationContext(String contextName, VectorizationContext vContext) { // Add an initial column to a vectorization context when // a vectorized row batch is being created. public void addInitialColumn(String columnName) { + initialColumnNames.add(columnName); int index = projectedColumns.size(); projectedColumns.add(index); projectionColumnNames.add(columnName); @@ -222,6 +228,10 @@ public void addProjectionColumn(String columnName, int vectorBatchColIndex) { projectionColumnMap.put(columnName, vectorBatchColIndex); } + public List getInitialColumnNames() { + return initialColumnNames; + } + public List getProjectedColumns() { return projectedColumns; } @@ -2196,13 +2206,16 @@ public int firstOutputColumnIndex() { return firstOutputColumnIndex; } - public Map getScratchColumnTypeMap() { - Map map = new HashMap(); + public String[] getScratchColumnTypeNames() { + String[] result = new String[ocm.outputColCount]; for (int i = 0; i < ocm.outputColCount; i++) { - String type = ocm.outputColumnsTypes[i]; - map.put(i+this.firstOutputColumnIndex, type); + String typeName = ocm.outputColumnsTypes[i]; + // if (typeName.equalsIgnoreCase("long")) { + // typeName = "bigint"; + // } + result[i] = typeName; } - return map; + return result; } @Override @@ -2222,9 +2235,7 @@ public int compare(Integer o1, Integer o2) { } sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", "); - Map sortedScratchColumnTypeMap = new TreeMap(comparerInteger); - sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap()); - sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap); + sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString()); return sb.toString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 99cb620..7cb9df5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -567,6 +567,34 @@ public static StandardStructObjectInspector convertToStandardStructObjectInspect return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids); } + public static String[] columnNamesFromStructObjectInspector( + StructObjectInspector structObjectInspector) throws HiveException { + + List fields = structObjectInspector.getAllStructFieldRefs(); + String[] result = new String[fields.size()]; + + int i = 0; + for(StructField field : fields) { + result[i++] = field.getFieldName(); + } + return result; + } + + public static TypeInfo[] typeInfosFromStructObjectInspector( + StructObjectInspector structObjectInspector) throws HiveException { + + List fields = structObjectInspector.getAllStructFieldRefs(); + TypeInfo[] result = new TypeInfo[fields.size()]; + + int i = 0; + for(StructField field : fields) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( + field.getFieldObjectInspector().getTypeName()); + result[i++] = typeInfo; + } + return result; + } + public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector( StructObjectInspector structObjectInspector) throws HiveException { @@ -582,6 +610,18 @@ public static StandardStructObjectInspector convertToStandardStructObjectInspect return result; } + public static TypeInfo[] typeInfosFromTypeNames( + String[] typeNames) throws HiveException { + + TypeInfo[] result = new TypeInfo[typeNames.length]; + + for(int i = 0; i < typeNames.length; i++) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]); + result[i] = typeInfo; + } + return result; + } + public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames( String[] typeNames) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index db382cd..51a60e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -58,6 +59,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.DataOutputBuffer; @@ -66,7 +69,7 @@ import org.apache.hive.common.util.DateUtils; /** - * Context for Vectorized row batch. this calss does eager deserialization of row data using serde + * Context for Vectorized row batch. this class does eager deserialization of row data using serde * in the RecordReader layer. * It has supports partitions in this layer so that the vectorized batch is populated correctly * with the partition column. @@ -75,72 +78,55 @@ private static final Log LOG = LogFactory.getLog(VectorizedRowBatchCtx.class.getName()); - // OI for raw row data (EG without partition cols) - private StructObjectInspector rawRowOI; - - // OI for the row (Raw row OI + partition OI) - private StructObjectInspector rowOI; - - // Deserializer for the row data - private Deserializer deserializer; - - // Hash map of partition values. Key=TblColName value=PartitionValue - private Map partitionValues; - - //partition types - private Map partitionTypes; - - // partition column positions, for use by classes that need to know whether a given column is a - // partition column - private Set partitionCols; - // Column projection list - List of column indexes to include. This // list does not contain partition columns private List colsToInclude; - private Map scratchColumnTypeMap = null; + private String[] rowColumnNames; + private TypeInfo[] rowColumnTypeInfos; + private int nonPartitionColumnCount; + private int partitionColumnCount; + private Object[] partitionValues; - /** - * Constructor for VectorizedRowBatchCtx - * - * @param rawRowOI - * OI for raw row data (EG without partition cols) - * @param rowOI - * OI for the row (Raw row OI + partition OI) - * @param deserializer - * Deserializer for the row data - * @param partitionValues - * Hash map of partition values. Key=TblColName value=PartitionValue - */ - public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI, - Deserializer deserializer, Map partitionValues, - Map partitionTypes) { - this.rowOI = rowOI; - this.rawRowOI = rawRowOI; - this.deserializer = deserializer; - this.partitionValues = partitionValues; - this.partitionTypes = partitionTypes; - } + private String[] scratchColumnTypeNames; /** * Constructor for VectorizedRowBatchCtx */ public VectorizedRowBatchCtx() { + } + public String[] getRowColumnNames() { + return rowColumnNames; + } + + public TypeInfo[] getRowColumnTypeInfos() { + return rowColumnTypeInfos; + } + + public int getNonPartitionColumnCount() { + return nonPartitionColumnCount; } /** - * Initializes the VectorizedRowBatch context based on an scratch column type map and + * Initializes the VectorizedRowBatch context based on an scratch column type names and * object inspector. - * @param scratchColumnTypeMap - * @param rowOI + * @param structObjectInspector + * @param scratchColumnTypeNames * Object inspector that shapes the column types + * @throws HiveException */ - public void init(Map scratchColumnTypeMap, - StructObjectInspector rowOI) { - this.scratchColumnTypeMap = scratchColumnTypeMap; - this.rowOI= rowOI; - this.rawRowOI = rowOI; + public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames) + throws HiveException { + + // Row column information. + rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector); + rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector); + partitionColumnCount = 0; + nonPartitionColumnCount = rowColumnTypeInfos.length; + + // Scratch column information. + this.scratchColumnTypeNames = scratchColumnTypeNames; } /** @@ -151,121 +137,59 @@ public void init(Map scratchColumnTypeMap, * Hive configuration using Hive plan is extracted * @param split * File split of the file being read - * @throws ClassNotFoundException - * @throws IOException - * @throws SerDeException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws HiveException + * @throws IOException */ - public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, - IOException, - SerDeException, - InstantiationException, - IllegalAccessException, - HiveException { + public void init(Configuration hiveConf, FileSplit split) throws IOException { - Map pathToPartitionInfo = Utilities - .getMapWork(hiveConf).getPathToPartitionInfo(); + MapWork mapWork = Utilities.getMapWork(hiveConf); - PartitionDesc part = HiveFileFormatUtils - .getPartitionDescFromPathRecursively(pathToPartitionInfo, - split.getPath(), IOPrepareCache.get().getPartitionDescMap()); + // Row column information. + rowColumnNames = mapWork.getVectorColumnNames(); + rowColumnTypeInfos = mapWork.getVectorColumnTypeInfos(); + partitionColumnCount = mapWork.getVectorPartitionColumnCount(); + nonPartitionColumnCount = rowColumnTypeInfos.length - partitionColumnCount; - String partitionPath = split.getPath().getParent().toString(); - scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf); - // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString()); + // Scratch column information. + scratchColumnTypeNames = mapWork.getVectorScratchColumnTypeNames(); - Properties partProps = - (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ? - part.getTableDesc().getProperties() : part.getProperties(); + if (partitionColumnCount > 0) { - Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName()); - Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); - SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(), - partProps); - StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer - .getObjectInspector(); + // Need to extract partition values. - deserializer = partDeserializer; - - // Check to see if this split is part of a partition of a table - String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - - String[] partKeys = null; - if (pcols != null && pcols.length() > 0) { - - // Partitions exist for this table. Get the partition object inspector and - // raw row object inspector (row with out partition col) + Map pathToPartitionInfo = Utilities + .getMapWork(hiveConf).getPathToPartitionInfo(); + + PartitionDesc part = HiveFileFormatUtils + .getPartitionDescFromPathRecursively(pathToPartitionInfo, + split.getPath(), IOPrepareCache.get().getPartitionDescMap()); + LinkedHashMap partSpec = part.getPartSpec(); - partKeys = pcols.trim().split("/"); - String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - - if (partKeys.length > partKeyTypes.length) { - throw new HiveException("Internal error : partKeys length, " +partKeys.length + - " greater than partKeyTypes length, " + partKeyTypes.length); - } - - List partNames = new ArrayList(partKeys.length); - List partObjectInspectors = new ArrayList(partKeys.length); - partitionValues = new LinkedHashMap(); - partitionTypes = new LinkedHashMap(); - for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - partNames.add(key); - ObjectInspector objectInspector = null; - Object objectVal; + + partitionValues = new Object[partitionColumnCount]; + + for (int i = 0; i < partitionColumnCount; i++) { + Object objectValue; if (partSpec == null) { - // for partitionless table, initialize partValue to empty string. - // We can have partitionless table even if we have partition keys + // For partition-less table, initialize partValue to empty string. + // We can have partition-less table even if we have partition keys // when there is only only partition selected and the partition key is not // part of the projection/include list. - objectVal = null; - objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector; - partitionTypes.put(key, PrimitiveCategory.STRING); + objectValue = null; } else { + String key = rowColumnNames[nonPartitionColumnCount + i]; + // Create a Standard java object Inspector - objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( - TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - objectVal = + ObjectInspector objectInspector = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( + rowColumnTypeInfos[nonPartitionColumnCount + i]); + objectValue = ObjectInspectorConverters. - getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, objectInspector). - convert(partSpec.get(key)); - partitionTypes.put(key, TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]).getPrimitiveCategory()); + getConverter(PrimitiveObjectInspectorFactory. + javaStringObjectInspector, objectInspector). + convert(partSpec.get(key)); } - if (LOG.isDebugEnabled()) { - LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key)); - } - partitionValues.put(key, objectVal); - partObjectInspectors.add(objectInspector); + partitionValues[i] = objectValue; } - - // Create partition OI - StructObjectInspector partObjectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(partNames, partObjectInspectors); - - // Get row OI from partition OI and raw row OI - StructObjectInspector rowObjectInspector = ObjectInspectorFactory - .getUnionStructObjectInspector(Arrays - .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector})); - rowOI = rowObjectInspector; - rawRowOI = partRawRowObjectInspector; - - // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it - partitionCols = new HashSet(); - if (pcols != null && pcols.length() > 0) { - for (int i = 0; i < partKeys.length; i++) { - partitionCols.add(getColIndexBasedOnColName(partKeys[i])); - } - } - - } else { - - // No partitions for this table, hence row OI equals raw row OI - rowOI = partRawRowObjectInspector; - rawRowOI = partRawRowObjectInspector; } colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf); @@ -279,126 +203,31 @@ public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx */ public VectorizedRowBatch createVectorizedRowBatch() throws HiveException { - List fieldRefs = rowOI.getAllStructFieldRefs(); - VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size()); - for (int j = 0; j < fieldRefs.size(); j++) { - // If the column is included in the include list or if the column is a - // partition column then create the column vector. Also note that partition columns are not - // in the included list. - if ((colsToInclude == null) || colsToInclude.contains(j) - || ((partitionValues != null) && - partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) { - ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); - switch (foi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - // Vectorization currently only supports the following data types: - // BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, BINARY, STRING, CHAR, VARCHAR, TIMESTAMP, - // DATE and DECIMAL - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case TIMESTAMP: - case DATE: - case INTERVAL_YEAR_MONTH: - case INTERVAL_DAY_TIME: - result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case FLOAT: - case DOUBLE: - result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case DECIMAL: - DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo(); - result.cols[j] = new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, - tInfo.precision(), tInfo.scale()); - break; - default: - throw new RuntimeException("Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - break; - } - case LIST: - case MAP: - case STRUCT: - case UNION: - throw new HiveException("Vectorizaton is not supported for datatype:" - + foi.getCategory()); - default: - throw new HiveException("Unknown ObjectInspector category!"); - } - } - } - result.numCols = fieldRefs.size(); - this.addScratchColumnsToBatch(result); - result.reset(); - return result; - } + int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; - /** - * Adds the row to the batch after deserializing the row - * - * @param rowIndex - * Row index in the batch to which the row is added - * @param rowBlob - * Row blob (serialized version of row) - * @param batch - * Vectorized batch to which the row is added - * @param buffer a buffer to copy strings into - * @throws HiveException - * @throws SerDeException - */ - public void addRowToBatch(int rowIndex, Writable rowBlob, - VectorizedRowBatch batch, - DataOutputBuffer buffer - ) throws HiveException, SerDeException - { - Object row = this.deserializer.deserialize(rowBlob); - VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer); - } + VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); + for (int i = 0; i < totalColumnCount; i++) { - /** - * Deserialized set of rows and populates the batch - * - * @param rowBlob - * to deserialize - * @param batch - * Vectorized row batch which contains deserialized data - * @throws SerDeException - */ - public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, - VectorizedRowBatch batch) - throws SerDeException { - - if (deserializer instanceof VectorizedSerde) { - ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch); - } else { - throw new SerDeException( - "Not able to deserialize row batch. Serde does not implement VectorizedSerde"); - } - } + // If the column is NOT included in the include list, then do not allocate the column. + // It will not be used. Also note that partition columns are not in the included list. + if (i < nonPartitionColumnCount && + colsToInclude != null && !colsToInclude.contains(i)) { + continue; + } - private int getColIndexBasedOnColName(String colName) throws HiveException - { - List fieldRefs = rowOI.getAllStructFieldRefs(); - for (int i = 0; i < fieldRefs.size(); i++) { - if (fieldRefs.get(i).getFieldName().equals(colName)) { - return i; + if (i < rowColumnTypeInfos.length) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = allocateColumnVector(typeInfo, result.DEFAULT_SIZE); + } else { + String typeName = scratchColumnTypeNames[i - rowColumnTypeInfos.length]; + result.cols[i] = allocateColumnVector(typeName, result.DEFAULT_SIZE); } + } - throw new HiveException("Not able to find column name in row object inspector"); + result.reset(); + return result; } - + /** * Add the partition values to the batch * @@ -407,16 +236,14 @@ private int getColIndexBasedOnColName(String colName) throws HiveException */ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException { - int colIndex; - Object value; - PrimitiveCategory pCategory; if (partitionValues != null) { - for (String key : partitionValues.keySet()) { - colIndex = getColIndexBasedOnColName(key); - value = partitionValues.get(key); - pCategory = partitionTypes.get(key); - - switch (pCategory) { + for (int i = 0; i < partitionColumnCount; i++) { + Object value = partitionValues[i]; + + int colIndex = nonPartitionColumnCount + i; + String partitionColumnName = rowColumnNames[colIndex]; + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex]; + switch (primitiveTypeInfo.getPrimitiveCategory()) { case BOOLEAN: { LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; if (value == null) { @@ -604,8 +431,8 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti break; default: - throw new HiveException("Unable to recognize the partition type " + pCategory + - " for column " + key); + throw new HiveException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() + + " for column " + partitionColumnName); } } } @@ -613,64 +440,68 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti /** * Determine whether a given column is a partition column - * @param colnum column number in + * @param colNum column number in * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context. * @return true if it is a partition column, false otherwise */ - public final boolean isPartitionCol(int colnum) { - return (partitionCols == null) ? false : partitionCols.contains(colnum); + public final boolean isPartitionCol(int colNum) { + return colNum >= nonPartitionColumnCount && colNum < rowColumnTypeInfos.length; } - private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException { - if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) { - int origNumCols = vrb.numCols; - int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size(); - vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); - for (int i = origNumCols; i < newNumCols; i++) { - String typeName = scratchColumnTypeMap.get(i); - if (typeName == null) { - throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString()); - } - vrb.cols[i] = allocateColumnVector(typeName, - VectorizedRowBatch.DEFAULT_SIZE); - } - vrb.numCols = vrb.cols.length; + public static ColumnVector allocateColumnVector(String typeName, int defaultSize) throws HiveException { + typeName = typeName.toLowerCase(); + + // Allow undecorated CHAR and VARCHAR to support scratch column type names. + if (typeName.equals("char") || typeName.equals("varchar")) { + return new BytesColumnVector(defaultSize); + } else if (typeName.equals("long")) { + typeName = "bigint"; } - } - /** - * Get the scale and precision for the given decimal type string. The decimal type is assumed to be - * of the format decimal(precision,scale) e.g. decimal(20,10). - * @param decimalType The given decimal type string. - * @return An integer array of size 2 with first element set to precision and second set to scale. - */ - private static int[] getScalePrecisionFromDecimalType(String decimalType) { - Pattern p = Pattern.compile("\\d+"); - Matcher m = p.matcher(decimalType); - m.find(); - int precision = Integer.parseInt(m.group()); - m.find(); - int scale = Integer.parseInt(m.group()); - int [] precScale = { precision, scale }; - return precScale; + TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName); + return allocateColumnVector(typeInfo, defaultSize); } - public static ColumnVector allocateColumnVector(String type, int defaultSize) { - if (type.equalsIgnoreCase("double")) { - return new DoubleColumnVector(defaultSize); - } else if (VectorizationContext.isStringFamily(type)) { - return new BytesColumnVector(defaultSize); - } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){ - int [] precisionScale = getScalePrecisionFromDecimalType(type); - return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]); - } else if (type.equalsIgnoreCase("long") || - type.equalsIgnoreCase("date") || - type.equalsIgnoreCase("timestamp") || - type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) || - type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) { - return new LongColumnVector(defaultSize); - } else { - throw new RuntimeException("Cannot allocate vector column for " + type); + public static ColumnVector allocateColumnVector(TypeInfo typeInfo, int defaultSize) throws HiveException { + switch (typeInfo.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + case DATE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return new LongColumnVector(defaultSize); + case FLOAT: + case DOUBLE: + return new DoubleColumnVector(defaultSize); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + return new BytesColumnVector(defaultSize); + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; + return new DecimalColumnVector(defaultSize, + decimalTypeInfo.precision(), decimalTypeInfo.scale()); + default: + throw new RuntimeException("Vectorizaton is not supported for datatype:" + + primitiveTypeInfo.getPrimitiveCategory().name()); + } + } + case LIST: + case MAP: + case STRUCT: + case UNION: + throw new HiveException("Vectorizaton is not supported for datatype:" + + typeInfo.getCategory()); + default: + throw new HiveException("Unknown type category " + typeInfo.getCategory().name()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 4c8c4b1..ed77292 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -664,22 +664,12 @@ protected HashTableLoader getHashTableLoader(Configuration hconf) { * build join output results in. */ protected VectorizedRowBatch setupOverflowBatch() throws HiveException { + + int initialColumnCount = vContext.firstOutputColumnIndex(); VectorizedRowBatch overflowBatch; - Map scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap(); - int maxColumn = 0; - for (int i = 0; i < outputProjection.length; i++) { - int outputColumn = outputProjection[i]; - if (maxColumn < outputColumn) { - maxColumn = outputColumn; - } - } - for (int outputColumn : scratchColumnTypeMap.keySet()) { - if (maxColumn < outputColumn) { - maxColumn = outputColumn; - } - } - overflowBatch = new VectorizedRowBatch(maxColumn + 1); + int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length; + overflowBatch = new VectorizedRowBatch(totalNumColumns); // First, just allocate just the projection columns we will be using. for (int i = 0; i < outputProjection.length; i++) { @@ -689,9 +679,9 @@ protected VectorizedRowBatch setupOverflowBatch() throws HiveException { } // Now, add any scratch columns needed for children operators. - for (int outputColumn : scratchColumnTypeMap.keySet()) { - String typeName = scratchColumnTypeMap.get(outputColumn); - allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName); + int outputColumn = initialColumnCount; + for (String typeName : vOutContext.getScratchColumnTypeNames()) { + allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName); } overflowBatch.projectedColumns = outputProjection; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java deleted file mode 100644 index faad5f2..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -/** - * A MapReduce/Hive Vectorized input format for RC files. - */ -public class VectorizedRCFileInputFormat extends FileInputFormat - implements InputFormatChecker { - - public VectorizedRCFileInputFormat() { - setMinSplitSize(SequenceFile.SYNC_INTERVAL); - } - - @Override - @SuppressWarnings("unchecked") - public RecordReader getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - - reporter.setStatus(split.toString()); - - return new VectorizedRCFileRecordReader(job, (FileSplit) split); - } - - @Override - public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList files) throws IOException { - if (files.size() <= 0) { - return false; - } - for (int fileId = 0; fileId < files.size(); fileId++) { - RCFile.Reader reader = null; - try { - reader = new RCFile.Reader(fs, files.get(fileId) - .getPath(), conf); - reader.close(); - reader = null; - } catch (IOException e) { - return false; - } finally { - if (null != reader) { - reader.close(); - } - } - } - return true; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java deleted file mode 100644 index 4cc1c2f..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.WeakHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; - -/** - * RCFileRecordReader. - */ -public class VectorizedRCFileRecordReader implements RecordReader { - - private final Reader in; - private final long start; - private final long end; - private boolean more = true; - protected Configuration conf; - private final FileSplit split; - private final boolean useCache; - private VectorizedRowBatchCtx rbCtx; - private final LongWritable keyCache = new LongWritable(); - private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable(); - private boolean addPartitionCols = true; - private final DataOutputBuffer buffer = new DataOutputBuffer(); - - private static RCFileSyncCache syncCache = new RCFileSyncCache(); - - private static final class RCFileSyncEntry { - long end; - long endSync; - } - - private static final class RCFileSyncCache { - - private final Map cache; - - public RCFileSyncCache() { - cache = Collections.synchronizedMap(new WeakHashMap()); - } - - public void put(FileSplit split, long endSync) { - Path path = split.getPath(); - long end = split.getStart() + split.getLength(); - String key = path.toString() + "+" + String.format("%d", end); - - RCFileSyncEntry entry = new RCFileSyncEntry(); - entry.end = end; - entry.endSync = endSync; - if (entry.endSync >= entry.end) { - cache.put(key, entry); - } - } - - public long get(FileSplit split) { - Path path = split.getPath(); - long start = split.getStart(); - String key = path.toString() + "+" + String.format("%d", start); - RCFileSyncEntry entry = cache.get(key); - if (entry != null) { - return entry.endSync; - } - return -1; - } - } - - public VectorizedRCFileRecordReader(Configuration conf, FileSplit split) - throws IOException { - - Path path = split.getPath(); - FileSystem fs = path.getFileSystem(conf); - this.in = new RCFile.Reader(fs, path, conf); - this.end = split.getStart() + split.getLength(); - this.conf = conf; - this.split = split; - - useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE); - - if (split.getStart() > in.getPosition()) { - long oldSync = useCache ? syncCache.get(split) : -1; - if (oldSync == -1) { - in.sync(split.getStart()); // sync to start - } else { - in.seek(oldSync); - } - } - - this.start = in.getPosition(); - - more = start < end; - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public Class getKeyClass() { - return LongWritable.class; - } - - public Class getValueClass() { - return BytesRefArrayWritable.class; - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - VectorizedRowBatch result; - try { - result = rbCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } - return result; - } - - public boolean nextBlock() throws IOException { - return in.nextBlock(); - } - - @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - - // Reset column fields noNull values to true - VectorizedBatchUtil.setNoNullFields(value); - buffer.reset(); - value.selectedInUse = false; - for (int i = 0; i < value.numCols; i++) { - value.cols[i].isRepeating = false; - } - - int i = 0; - try { - - for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { - more = next(keyCache); - if (more) { - // Check and update partition cols if necessary. Ideally this should be done - // in CreateValue() as the partition is constant per split. But since Hive uses - // CombineHiveRecordReader and as this does not call CreateValue() for - // each new RecordReader it creates, this check is required in next() - if (addPartitionCols) { - rbCtx.addPartitionColsToBatch(value); - addPartitionCols = false; - } - in.getCurrentRow(colsCache); - // Currently RCFile reader does not support reading vectorized - // data. Populating the batch by adding one row at a time. - rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer); - } else { - break; - } - } - } catch (Exception e) { - throw new RuntimeException("Error while getting next row", e); - } - value.size = i; - return more; - } - - protected boolean next(LongWritable key) throws IOException { - if (!more) { - return false; - } - - more = in.next(key); - - long lastSeenSyncPos = in.lastSeenSyncPos(); - - if (lastSeenSyncPos >= end) { - if (useCache) { - syncCache.put(split, lastSeenSyncPos); - } - more = false; - return more; - } - return more; - } - - /** - * Return the progress within the input split. - * - * @return 0.0 to 1.0 of the input byte range - */ - public float getProgress() throws IOException { - if (end == start) { - return 0.0f; - } else { - return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start)); - } - } - - public long getPos() throws IOException { - return in.getPosition(); - } - - public KeyBuffer getKeyBuffer() { - return in.getCurrentKeyBufferObj(); - } - - protected void seek(long pos) throws IOException { - in.seek(pos); - } - - public void sync(long pos) throws IOException { - in.sync(pos); - } - - public void resetBuffer() { - in.resetBuffer(); - } - - public long getStart() { - return start; - } - - public void close() throws IOException { - in.close(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java index dba9071..23d3e7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** * A row-by-row iterator for ORC files. @@ -41,6 +42,13 @@ Object next(Object previous) throws IOException; /** + * Set the vector column information expected by the reader. + * @param rowColumnNames the expected column names. + * @param rowColumnTypeInfos the type information for the columns. + */ + void setVectorColumnInfo(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos); + + /** * Read the next row batch. The size of the batch to read cannot be controlled * by the callers. Caller need to look at VectorizedRowBatch.size of the retunred * object to know the batch size read. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 58e19cb..480c314 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; import org.apache.hadoop.io.Text; @@ -92,6 +93,9 @@ private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool(); private final ZeroCopyReaderShim zcr; + private String[] rowColumnNames; + private TypeInfo[] rowColumnTypeInfos; + public final static class Index { OrcProto.RowIndex[] rowGroupIndex; OrcProto.BloomFilterIndex[] bloomFilterIndex; @@ -1049,6 +1053,13 @@ public Object next(Object previous) throws IOException { } @Override + public void setVectorColumnInfo(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos) { + // TODO Auto-generated method stub + this.rowColumnNames = rowColumnNames; + this.rowColumnTypeInfos = rowColumnTypeInfos; + } + + @Override public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException { try { final VectorizedRowBatch result; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index a8e5c2e..bdd5f79 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -58,19 +58,7 @@ this.rowBatchCtx = new VectorizedRowBatchCtx(); this.value = inner.createValue(); this.objectInspector = inner.getObjectInspector(); - try { - rowBatchCtx.init(conf, split); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to initialize context", e); - } catch (SerDeException e) { - throw new IOException("Failed to initialize context", e); - } catch (InstantiationException e) { - throw new IOException("Failed to initialize context", e); - } catch (IllegalAccessException e) { - throw new IOException("Failed to initialize context", e); - } catch (HiveException e) { - throw new IOException("Failed to initialize context", e); - } + rowBatchCtx.init(conf, split); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 3992d8c..d714521 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -75,6 +77,16 @@ } catch (Exception e) { throw new RuntimeException(e); } + + // Tell the vectorized ORC reader which column we are expecting to read. + int nonPartitionColumnCount = rbCtx.getNonPartitionColumnCount(); + String[] rowColumnNames = rbCtx.getRowColumnNames(); + TypeInfo[] rowColumnTypeInfos = rbCtx.getRowColumnTypeInfos(); + if (nonPartitionColumnCount < rowColumnNames.length){ + rowColumnNames = Arrays.copyOf(rowColumnNames, nonPartitionColumnCount); + rowColumnTypeInfos = Arrays.copyOf(rowColumnTypeInfos, nonPartitionColumnCount); + } + reader.setVectorColumnInfo(rowColumnNames, rowColumnTypeInfos); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index e7b9c73..b6aae93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Stack; @@ -140,6 +141,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -303,17 +305,48 @@ public Vectorizer() { supportedAggregationUdfs.add("stddev_samp"); } + + private class VectorTaskColumnInfo { + List columnNames; + List typeInfos; + int partitionColumnCount; + + String[] scratchTypeNameArray; + + VectorTaskColumnInfo() { + partitionColumnCount = 0; + } + + public void setColumnNames(List columnNames) { + this.columnNames = columnNames; + } + public void setTypeInfos(List typeInfos) { + this.typeInfos = typeInfos; + } + public void setPartitionColumnCount(int partitionColumnCount) { + this.partitionColumnCount = partitionColumnCount; + } + public void setScratchTypeNameArray(String[] scratchTypeNameArray) { + this.scratchTypeNameArray = scratchTypeNameArray; + } + + public void transferToBaseWork(BaseWork baseWork) { + String[] columnNameArray = columnNames.toArray(new String[0]); + baseWork.setVectorColumnNames(columnNameArray); + TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]); + baseWork.setVectorColumnTypeInfos(typeInfoArray); + baseWork.setVectorPartitionColumnCount(partitionColumnCount); + + baseWork.setVectorScratchColumnTypeNames(scratchTypeNameArray); + } + } + class VectorizationDispatcher implements Dispatcher { private final PhysicalContext physicalContext; - private List reduceColumnNames; - private List reduceTypeInfos; - public VectorizationDispatcher(PhysicalContext physicalContext) { this.physicalContext = physicalContext; - reduceColumnNames = null; - reduceTypeInfos = null; } @Override @@ -351,9 +384,10 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) } private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { - boolean ret = validateMapWork(mapWork, isTez); + VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); + boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez); if (ret) { - vectorizeMapWork(mapWork, isTez); + vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez); } } @@ -364,40 +398,227 @@ private void addMapWorkRules(Map opRules, NodeProcessor np) + ReduceSinkOperator.getOperatorName()), np); } - private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException { - LOG.info("Validating MapWork..."); + private TableScanOperator verifyOnlyOneTableScanOperator(MapWork mapWork) { // Eliminate MR plans with more than one TableScanOperator. + LinkedHashMap> aliasToWork = mapWork.getAliasToWork(); if ((aliasToWork == null) || (aliasToWork.size() == 0)) { - return false; + return null; } int tableScanCount = 0; + TableScanOperator tableScanOperator = null; for (Operator op : aliasToWork.values()) { if (op == null) { LOG.warn("Map work has invalid aliases to work with. Fail validation!"); - return false; + return null; } if (op instanceof TableScanOperator) { tableScanCount++; + tableScanOperator = (TableScanOperator) op; } } if (tableScanCount > 1) { LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!"); + return null; + } + return tableScanOperator; + } + + private void getTableSchemaInfo(RowSchema rowSchema, + List columnNames, List typeInfos) { + + // Add all non-virtual columns to make a vectorization context for + // the TableScan operator. + for (ColumnInfo c : rowSchema.getSignature()) { + // Validation will later exclude vectorization of virtual columns usage (HIVE-5560). + if (!isVirtualColumn(c)) { + columnNames.add(c.getInternalName()); + + String typeName = c.getTypeName(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName); + typeInfos.add(typeInfo); + } + } + } + + private String getColumns(List columnNames, int start, int length) { + StringBuilder sb = new StringBuilder(); + for (int i = start; i < start + length; i++) { + if (i > start) { + sb.append(","); + } + sb.append(columnNames.get(i)); + } + return sb.toString(); + } + + private String getTypes(List typeInfos, int start, int length) { + StringBuilder sb = new StringBuilder(); + for (int i = start; i < start + length; i++) { + if (i > start) { + sb.append(":"); + } + sb.append(typeInfos.get(i).getTypeName()); + } + return sb.toString(); + } + + private boolean verifyInputFormat(PartitionDesc pd) { + List> interfaceList = + Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); + if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { + LOG.info("Input format: " + pd.getInputFileFormatClassName() + + ", doesn't provide vectorized input"); return false; } + return true; + } + + private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, + TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) { + + List columnNames = new ArrayList(); + List typeInfos = new ArrayList(); + + getTableSchemaInfo(tableScanOperator.getSchema(), columnNames, typeInfos); + + // Validate input format and schema evolution capability. + + int totalColumnCount = columnNames.size(); + + boolean haveInfo = false; + String nonPartColumns = ""; + String nonPartTypes = ""; + String partColumns = ""; + String partTypes = ""; + int partitionColumnCount = 0; + int nonPartitionColumnCount = 0; // Validate the input format - for (String path : mapWork.getPathToPartitionInfo().keySet()) { - PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); - List> interfaceList = - Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); - if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { - LOG.info("Input format: " + pd.getInputFileFormatClassName() - + ", doesn't provide vectorized input"); + for (Entry entry : mapWork.getPathToPartitionInfo().entrySet()) { + PartitionDesc pd = entry.getValue(); + if (!verifyInputFormat(pd)) { + return false; + } + if (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) { + // No partition information -- we match because we would default to using the table description. + continue; + } + Properties partProps = pd.getProperties(); + String nextNonPartColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS); + String nextNonPartTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES); + String nextPartColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); + String nextPartTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); + + if (!haveInfo) { + partitionColumnCount = nextPartColumns.split(",").length; + nonPartitionColumnCount = totalColumnCount - partitionColumnCount; + + nonPartColumns = getColumns(columnNames, 0, nonPartitionColumnCount); + nonPartTypes = getTypes(typeInfos, 0, nonPartitionColumnCount); + + partColumns = getColumns(columnNames, nonPartitionColumnCount, partitionColumnCount); + partTypes = getTypes(typeInfos, nonPartitionColumnCount, partitionColumnCount); + + haveInfo = true; + } + + if (!partColumns.equalsIgnoreCase(nextPartColumns)) { + LOG.info( + String.format("Could not vectorize partition %s. Its partition column names %s do not match the other partition column names %s", + entry.getKey(), nextPartColumns, partColumns)); + return false; + } + if (!partTypes.equalsIgnoreCase(nextPartTypes)) { + LOG.info( + String.format("Could not vectorize partition %s. Its partition column types %s do not match the other partition column types %s", + entry.getKey(), nextPartTypes, partTypes)); + return false; + } + + // Column name checks. + + if (nonPartColumns.length() == nextNonPartColumns.length()) { + + // Should be same columns. + if (!nonPartColumns.equalsIgnoreCase(nextNonPartColumns)) { + LOG.info( + String.format("Could not vectorize partition %s. Its column names %s do not match the other column names %s", + entry.getKey(), nextNonPartColumns, nonPartColumns)); + return false; + } + } else if (nonPartColumns.length() > nextNonPartColumns.length()) { + + // Looks like columns were added after the partition was created. + int nextLen = nextNonPartColumns.length(); + String nonPartColumnsSubstr = nonPartColumns.substring(0, nextLen); + if (!nonPartColumnsSubstr.equalsIgnoreCase(nextNonPartColumns) || nonPartColumns.charAt(nextLen) != ',') { + LOG.info( + String.format("Could not vectorize partition %s. The first column names %s do not match the other first column names %s", + entry.getKey(), nextNonPartColumns, nonPartColumnsSubstr)); + return false; + } + } else { + LOG.info( + String.format("Could not vectorize partition %s. Its column names %s length longer than the other column names %s length", + entry.getKey(), nextNonPartColumns, nonPartColumns)); + return false; + } + + // Type name checks. + + if (nonPartTypes.length() == nextNonPartTypes.length()) { + + // Should be same columns. + if (!nonPartTypes.equalsIgnoreCase(nextNonPartTypes)) { + LOG.info( + String.format("Could not vectorize partition %s. Its type names %s do not match the other type names %s", + entry.getKey(), nextNonPartTypes, nonPartTypes)); + return false; + } + } else if (nonPartTypes.length() > nextNonPartTypes.length()) { + + // Looks like columns were added after the partition was created. + int nextLen = nextNonPartTypes.length(); + String nonPartTypesSubstr = nonPartTypes.substring(0, nextLen); + if (!nonPartTypesSubstr.equalsIgnoreCase(nextNonPartTypes) || nonPartTypes.charAt(nextLen) != ':') { + LOG.info( + String.format("Could not vectorize partition %s. The first type names %s do not match the other first type names %s", + entry.getKey(), nextNonPartTypes, nonPartTypesSubstr)); + return false; + } + } else { + LOG.info( + String.format("Could not vectorize partition %s. Its type names %s length longer than the other type names %s length", + entry.getKey(), nextNonPartTypes, nonPartTypes)); return false; } } + + vectorTaskColumnInfo.setColumnNames(columnNames); + vectorTaskColumnInfo.setTypeInfos(typeInfos); + vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount); + + return true; + } + + private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) + throws SemanticException { + + LOG.info("Validating MapWork..."); + + TableScanOperator tableScanOperator = verifyOnlyOneTableScanOperator(mapWork); + if (tableScanOperator == null) { + return false; + } + + // This call fills in the column names, types, and partition column count in + // vectorTaskColumnInfo. + if (!validateInputFormatAndSchemaEvolution(mapWork, tableScanOperator, vectorTaskColumnInfo)) { + return false; + } + Map opRules = new LinkedHashMap(); MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez); addMapWorkRules(opRules, vnp); @@ -419,11 +640,14 @@ private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticE return true; } - private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, + boolean isTez) throws SemanticException { + LOG.info("Vectorizing MapWork..."); mapWork.setVectorMode(true); Map opRules = new LinkedHashMap(); - MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez); + MapWorkVectorizationNodeProcessor vnp = + new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -433,9 +657,9 @@ private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticExc HashMap nodeOutput = new HashMap(); ogw.startWalking(topNodes, nodeOutput); - mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap()); - mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap()); - mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap()); + vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames()); + + vectorTaskColumnInfo.transferToBaseWork(mapWork); if (LOG.isDebugEnabled()) { debugDisplayAllMaps(mapWork); @@ -445,13 +669,19 @@ private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticExc } private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { - boolean ret = validateReduceWork(reduceWork); + VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); + boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez); if (ret) { - vectorizeReduceWork(reduceWork, isTez); + vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez); } } - private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException { + private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork, + VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { + + ArrayList reduceColumnNames = new ArrayList(); + ArrayList reduceTypeInfos = new ArrayList(); + try { // Check key ObjectInspector. ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); @@ -475,9 +705,6 @@ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws Sema StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; List valueFields = valueStructObjectInspector.getAllStructFieldRefs(); - reduceColumnNames = new ArrayList(); - reduceTypeInfos = new ArrayList(); - for (StructField field: keyFields) { reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); @@ -489,6 +716,10 @@ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws Sema } catch (Exception e) { throw new SemanticException(e); } + + vectorTaskColumnInfo.setColumnNames(reduceColumnNames); + vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos); + return true; } @@ -497,11 +728,13 @@ private void addReduceWorkRules(Map opRules, NodeProcessor opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np); } - private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException { + private boolean validateReduceWork(ReduceWork reduceWork, + VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { + LOG.info("Validating ReduceWork..."); // Validate input to ReduceWork. - if (!getOnlyStructObjectInspectors(reduceWork)) { + if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) { return false; } // Now check the reduce operator tree. @@ -525,7 +758,9 @@ private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticExcepti return true; } - private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { + private void vectorizeReduceWork(ReduceWork reduceWork, + VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { + LOG.info("Vectorizing ReduceWork..."); reduceWork.setVectorMode(true); @@ -534,7 +769,7 @@ private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws Se // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker. Map opRules = new LinkedHashMap(); ReduceWorkVectorizationNodeProcessor vnp = - new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez); + new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez); addReduceWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -549,9 +784,9 @@ private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws Se // Necessary since we are vectorizing the root operator in reduce. reduceWork.setReducer(vnp.getRootVectorOp()); - reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap()); - reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap()); - reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap()); + vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames()); + + vectorTaskColumnInfo.transferToBaseWork(reduceWork); if (LOG.isDebugEnabled()) { debugDisplayAllMaps(reduceWork); @@ -614,23 +849,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // The vectorization context for the Map or Reduce task. protected VectorizationContext taskVectorizationContext; - // The input projection column type name map for the Map or Reduce task. - protected Map taskColumnTypeNameMap; - VectorizationNodeProcessor() { - taskColumnTypeNameMap = new HashMap(); - } - - public Map getVectorColumnNameMap() { - return taskVectorizationContext.getProjectionColumnMap(); - } - - public Map getVectorColumnTypeMap() { - return taskColumnTypeNameMap; } - public Map getVectorScratchColumnTypeMap() { - return taskVectorizationContext.getScratchColumnTypeMap(); + public String[] getVectorScratchColumnTypeNames() { + return taskVectorizationContext.getScratchColumnTypeNames(); } protected final Set> opsDone = @@ -700,11 +923,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor { private final MapWork mWork; + private VectorTaskColumnInfo vectorTaskColumnInfo; private final boolean isTez; - public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) { + public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez, + VectorTaskColumnInfo vectorTaskColumnInfo) { super(); this.mWork = mWork; + this.vectorTaskColumnInfo = vectorTaskColumnInfo; this.isTez = isTez; } @@ -718,8 +944,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { - taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(), - taskColumnTypeNameMap); + taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); } vContext = taskVectorizationContext; } else { @@ -760,8 +985,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor { - private final List reduceColumnNames; - private final List reduceTypeInfos; + private VectorTaskColumnInfo vectorTaskColumnInfo; private boolean isTez; @@ -771,11 +995,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return rootVectorOp; } - public ReduceWorkVectorizationNodeProcessor(List reduceColumnNames, - List reduceTypeInfos, boolean isTez) { + public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, + boolean isTez) { + super(); - this.reduceColumnNames = reduceColumnNames; - this.reduceTypeInfos = reduceTypeInfos; + this.vectorTaskColumnInfo = vectorTaskColumnInfo; rootVectorOp = null; this.isTez = isTez; } @@ -791,15 +1015,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, boolean saveRootVectorOp = false; if (op.getParentOperators().size() == 0) { - LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString()); + LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString()); - vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames); + vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.columnNames); taskVectorizationContext = vContext; - int i = 0; - for (TypeInfo typeInfo : reduceTypeInfos) { - taskColumnTypeNameMap.put(i, typeInfo.getTypeName()); - i++; - } + saveRootVectorOp = true; if (LOG.isDebugEnabled()) { @@ -1326,23 +1546,11 @@ private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mo return result; } - private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName, - Map typeNameMap) { + private VectorizationContext getVectorizationContext(String contextName, + VectorTaskColumnInfo vectorTaskColumnInfo) { - VectorizationContext vContext = new VectorizationContext(contextName); - - // Add all non-virtual columns to make a vectorization context for - // the TableScan operator. - int i = 0; - for (ColumnInfo c : rowSchema.getSignature()) { - // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560). - if (!isVirtualColumn(c)) { - vContext.addInitialColumn(c.getInternalName()); - typeNameMap.put(i, c.getTypeName()); - i++; - } - } - vContext.finishedAddingInitialColumns(); + // UNDONE: And, the types??? + VectorizationContext vContext = new VectorizationContext(contextName, vectorTaskColumnInfo.columnNames); return vContext; } @@ -1700,12 +1908,14 @@ private boolean isVirtualColumn(ColumnInfo column) { public void debugDisplayAllMaps(BaseWork work) { - Map columnNameMap = work.getVectorColumnNameMap(); - Map columnTypeMap = work.getVectorColumnTypeMap(); - Map scratchColumnTypeMap = work.getVectorScratchColumnTypeMap(); + String[] columnNames = work.getVectorColumnNames(); + Object columnTypeInfos = work.getVectorColumnTypeInfos(); + int partitionColumnCount = work.getVectorPartitionColumnCount(); + String[] scratchColumnTypeNames = work.getVectorScratchColumnTypeNames(); - LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString()); - LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString()); - LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString()); + LOG.debug("debugDisplayAllMaps columnNames " + Arrays.toString(columnNames)); + LOG.debug("debugDisplayAllMaps columnTypeInfos " + Arrays.deepToString((Object[]) columnTypeInfos)); + LOG.debug("debugDisplayAllMaps partitionColumnCount " + partitionColumnCount); + LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " + Arrays.toString(scratchColumnTypeNames)); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index a342738..0441a6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** @@ -61,9 +62,11 @@ public BaseWork(String name) { // Vectorization. - protected Map vectorColumnNameMap; - protected Map vectorColumnTypeMap; - protected Map vectorScratchColumnTypeMap; + protected String[] vectorColumnNames; + protected TypeInfo[] vectorColumnTypeInfos; + protected int vectorPartitionColumnCount; + + protected String[] vectorScratchColumnTypeNames; public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; @@ -145,30 +148,42 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { return returnSet; } - public Map getVectorColumnNameMap() { - return vectorColumnNameMap; + // ----------------------------------------------------------------------------------------------- + + public String[] getVectorColumnNames() { + return vectorColumnNames; + } + + public void setVectorColumnNames(String[] vectorColumnNames) { + this.vectorColumnNames = vectorColumnNames; } - public void setVectorColumnNameMap(Map vectorColumnNameMap) { - this.vectorColumnNameMap = vectorColumnNameMap; + public TypeInfo[] getVectorColumnTypeInfos() { + return vectorColumnTypeInfos; } - public Map getVectorColumnTypeMap() { - return vectorColumnTypeMap; + public void setVectorColumnTypeInfos(TypeInfo[] vectorColumnTypeInfos) { + this.vectorColumnTypeInfos = vectorColumnTypeInfos; } - public void setVectorColumnTypeMap(Map vectorColumnTypeMap) { - this.vectorColumnTypeMap = vectorColumnTypeMap; + public int getVectorPartitionColumnCount() { + return vectorPartitionColumnCount; } - public Map getVectorScratchColumnTypeMap() { - return vectorScratchColumnTypeMap; + public void setVectorPartitionColumnCount(int vectorPartitionColumnCount) { + this.vectorPartitionColumnCount = vectorPartitionColumnCount; } - public void setVectorScratchColumnTypeMap(Map vectorScratchColumnTypeMap) { - this.vectorScratchColumnTypeMap = vectorScratchColumnTypeMap; + public String[] getVectorScratchColumnTypeNames() { + return vectorScratchColumnTypeNames; } + public void setVectorScratchColumnTypeNames(String[] vectorScratchColumnTypeNames) { + this.vectorScratchColumnTypeNames = vectorScratchColumnTypeNames; + } + + // ----------------------------------------------------------------------------------------------- + /** * @return the mapredLocalWork */ diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java index 0f8712e..c076e6c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorRowObject.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import junit.framework.TestCase; @@ -50,13 +51,13 @@ void examineBatch(VectorizedRowBatch batch, VectorExtractRowSameBatch vectorExtr void testVectorRowObject(int caseNum, Random r) throws HiveException { - Map emptyScratchMap = new HashMap(); + String[] emptyScratchTypeNames = new String[0]; RandomRowObjectSource source = new RandomRowObjectSource(); source.init(r); VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(emptyScratchMap, source.rowStructObjectInspector()); + batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames); VectorizedRowBatch batch = batchContext.createVectorizedRowBatch(); VectorAssignRowSameBatch vectorAssignRow = new VectorAssignRowSameBatch(); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java index 23e44f0..d3dc30d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.serde2.fast.SerializeWrite; import org.apache.hadoop.io.BooleanWritable; @@ -331,13 +332,13 @@ void serializeBatch(VectorizedRowBatch batch, VectorSerializeRow vectorSerialize void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { - Map emptyScratchMap = new HashMap(); + String[] emptyScratchTypeNames = new String[0]; RandomRowObjectSource source = new RandomRowObjectSource(); source.init(r); VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(emptyScratchMap, source.rowStructObjectInspector()); + batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames); VectorizedRowBatch batch = batchContext.createVectorizedRowBatch(); VectorAssignRowSameBatch vectorAssignRow = new VectorAssignRowSameBatch(); @@ -563,13 +564,13 @@ private LazySerDeParameters getSerDeParams(StructObjectInspector rowObjectInspec void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { - Map emptyScratchMap = new HashMap(); + String[] emptyScratchTypeNames = new String[0]; RandomRowObjectSource source = new RandomRowObjectSource(); source.init(r); VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(emptyScratchMap, source.rowStructObjectInspector()); + batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames); VectorizedRowBatch batch = batchContext.createVectorizedRowBatch(); int fieldCount = source.typeNames().size(); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java deleted file mode 100644 index 473ebac..0000000 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java +++ /dev/null @@ -1,355 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import java.io.File; -import java.io.IOException; -import java.sql.Timestamp; -import java.util.Arrays; -import java.util.Calendar; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.RCFile; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Class that tests the functionality of VectorizedRowBatchCtx. - */ -public class TestVectorizedRowBatchCtx { - - private Configuration conf; - private FileSystem fs; - private Path testFilePath; - private int colCount; - private ColumnarSerDe serDe; - private Properties tbl; - - @Before - public void openFileSystem() throws Exception { - conf = new Configuration(); - fs = FileSystem.getLocal(conf); - Path workDir = new Path(System.getProperty("test.tmp.dir", - "target" + File.separator + "test" + File.separator + "tmp")); - fs.setWorkingDirectory(workDir); - testFilePath = new Path("TestVectorizedRowBatchCtx.testDump.rc"); - fs.delete(testFilePath, false); - } - - private void initSerde() { - tbl = new Properties(); - - // Set the configuration parameters - tbl.setProperty(serdeConstants.SERIALIZATION_FORMAT, "6"); - tbl.setProperty("columns", - "ashort,aint,along,adouble,afloat,astring,abyte,aboolean,atimestamp"); - tbl.setProperty("columns.types", - "smallint:int:bigint:double:float:string:tinyint:boolean:timestamp"); - colCount = 9; - tbl.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL"); - - try { - serDe = new ColumnarSerDe(); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - } catch (SerDeException e) { - throw new RuntimeException(e); - } - } - - private void WriteRCFile(FileSystem fs, Path file, Configuration conf) - throws IOException, SerDeException { - fs.delete(file, true); - - RCFileOutputFormat.setColumnNumber(conf, colCount); - RCFile.Writer writer = - new RCFile.Writer(fs, conf, file, null, null, - new DefaultCodec()); - - for (int i = 0; i < 10; ++i) { - BytesRefArrayWritable bytes = new BytesRefArrayWritable(colCount); - BytesRefWritable cu; - - if (i % 3 != 0) { - //if (i < 100) { - cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length); - bytes.set(0, cu); - - cu = new BytesRefWritable((i + 100 + "").getBytes("UTF-8"), 0, - (i + 100 + "").getBytes("UTF-8").length); - bytes.set(1, cu); - - cu = new BytesRefWritable((i + 200 + "").getBytes("UTF-8"), 0, - (i + 200 + "").getBytes("UTF-8").length); - bytes.set(2, cu); - - cu = new BytesRefWritable((i + 1.23 + "").getBytes("UTF-8"), 0, - (i + 1.23 + "").getBytes("UTF-8").length); - bytes.set(3, cu); - - cu = new BytesRefWritable((i + 2.23 + "").getBytes("UTF-8"), 0, - (i + 2.23 + "").getBytes("UTF-8").length); - bytes.set(4, cu); - - cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0, - ("Test string").getBytes("UTF-8").length); - bytes.set(5, cu); - - cu = new BytesRefWritable((1 + "").getBytes("UTF-8"), 0, - (1 + "").getBytes("UTF-8").length); - bytes.set(6, cu); - - cu = new BytesRefWritable(("true").getBytes("UTF-8"), 0, - ("true").getBytes("UTF-8").length); - bytes.set(7, cu); - - Timestamp t = new Timestamp(Calendar.getInstance().getTime().getTime()); - cu = new BytesRefWritable(t.toString().getBytes("UTF-8"), 0, - t.toString().getBytes("UTF-8").length); - bytes.set(8, cu); - - } else { - cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length); - bytes.set(0, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(1, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(2, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(3, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(4, cu); - - cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0, - ("Test string").getBytes("UTF-8").length); - bytes.set(5, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(6, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(7, cu); - -// cu = new BytesRefWritable(new byte[0], 0, 0); -// bytes.set(8, cu); - Timestamp t = new Timestamp(Calendar.getInstance().getTime().getTime()); - cu = new BytesRefWritable(t.toString().getBytes("UTF-8"), 0, - t.toString().getBytes("UTF-8").length); - bytes.set(8, cu); - } - writer.append(bytes); - } - writer.close(); - } - - private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, IOException { - - RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf); - DataOutputBuffer buffer = new DataOutputBuffer(); - - // Get object inspector - StructObjectInspector oi = (StructObjectInspector) serDe - .getObjectInspector(); - List fieldRefs = oi.getAllStructFieldRefs(); - - Assert.assertEquals("Field size should be 9", colCount, fieldRefs.size()); - - // Create the context - VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null, null); - VectorizedRowBatch batch = ctx.createVectorizedRowBatch(); - VectorizedBatchUtil.setNoNullFields(batch); - - // Iterate thru the rows and populate the batch - LongWritable rowID = new LongWritable(); - for (int i = 0; i < 10; i++) { - reader.next(rowID); - BytesRefArrayWritable cols = new BytesRefArrayWritable(); - reader.getCurrentRow(cols); - cols.resetValid(colCount); - ctx.addRowToBatch(i, cols, batch, buffer); - } - reader.close(); - batch.size = 10; - return batch; - } - - void ValidateRowBatch(VectorizedRowBatch batch) throws IOException, SerDeException { - - LongWritable rowID = new LongWritable(); - RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf); - for (int i = 0; i < batch.size; i++) { - reader.next(rowID); - BytesRefArrayWritable cols = new BytesRefArrayWritable(); - reader.getCurrentRow(cols); - cols.resetValid(colCount); - Object row = serDe.deserialize(cols); - - StructObjectInspector oi = (StructObjectInspector) serDe - .getObjectInspector(); - List fieldRefs = oi.getAllStructFieldRefs(); - - for (int j = 0; j < fieldRefs.size(); j++) { - Object fieldData = oi.getStructFieldData(row, fieldRefs.get(j)); - ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); - - // Vectorization only supports PRIMITIVE data types. Assert the same - Assert.assertEquals(true, foi.getCategory() == Category.PRIMITIVE); - - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - Object writableCol = poi.getPrimitiveWritableObject(fieldData); - if (writableCol != null) { - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == (((BooleanWritable) writableCol).get() ? 1 - : 0)); - } - break; - case BYTE: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == (long) ((ByteWritable) writableCol).get()); - } - break; - case SHORT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == ((ShortWritable) writableCol).get()); - } - break; - case INT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == ((IntWritable) writableCol).get()); - } - break; - case LONG: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == ((LongWritable) writableCol).get()); - } - break; - case FLOAT: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j]; - Assert.assertEquals(true, dcv.vector[i] == ((FloatWritable) writableCol).get()); - } - break; - case DOUBLE: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j]; - Assert.assertEquals(true, dcv.vector[i] == ((DoubleWritable) writableCol).get()); - } - break; - case BINARY: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[j]; - BytesWritable colBinary = (BytesWritable) writableCol; - BytesWritable batchBinary = (BytesWritable) bcv.getWritableObject(i); - byte[] a = colBinary.getBytes(); - byte[] b = batchBinary.getBytes(); - Assert.assertEquals(true, Arrays.equals(a, b)); - } - break; - case STRING: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[j]; - Text colText = (Text) writableCol; - Text batchText = (Text) bcv.getWritableObject(i); - String a = colText.toString(); - String b = batchText.toString(); - Assert.assertEquals(true, a.equals(b)); - } - break; - case TIMESTAMP: { - LongColumnVector tcv = (LongColumnVector) batch.cols[j]; - Timestamp t = ((TimestampWritable) writableCol).getTimestamp(); - long timeInNanoSec = (t.getTime() * 1000000) + (t.getNanos() % 1000000); - Assert.assertEquals(true, tcv.vector[i] == timeInNanoSec); - } - break; - default: - Assert.assertTrue("Unknown type", false); - } - } else { - Assert.assertEquals(true, batch.cols[j].isNull[i]); - } - } - - // Check repeating - Assert.assertEquals(false, batch.cols[0].isRepeating); - Assert.assertEquals(false, batch.cols[1].isRepeating); - Assert.assertEquals(false, batch.cols[2].isRepeating); - Assert.assertEquals(false, batch.cols[3].isRepeating); - Assert.assertEquals(false, batch.cols[4].isRepeating); - - // Check non null - Assert.assertEquals(true, batch.cols[0].noNulls); - Assert.assertEquals(false, batch.cols[1].noNulls); - Assert.assertEquals(false, batch.cols[2].noNulls); - Assert.assertEquals(false, batch.cols[3].noNulls); - Assert.assertEquals(false, batch.cols[4].noNulls); - } - reader.close(); - } - - @Test - public void TestCtx() throws Exception { - initSerde(); - WriteRCFile(this.fs, this.testFilePath, this.conf); - VectorizedRowBatch batch = GetRowBatch(); - ValidateRowBatch(batch); - - // Test VectorizedColumnarSerDe - VectorizedColumnarSerDe vcs = new VectorizedColumnarSerDe(); - SerDeUtils.initializeSerDe(vcs, this.conf, tbl, null); - Writable w = vcs.serializeVector(batch, (StructObjectInspector) serDe - .getObjectInspector()); - BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) ((ObjectWritable) w).get(); - vcs.deserializeVector(refArray, 10, batch); - ValidateRowBatch(batch); - } -}