diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 03bd873..0a8c6b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -64,6 +65,10 @@ // Hash map of partition values. Key=TblColName value=PartitionValue private LinkedHashMap partitionValues; + // Column projection list - List of column indexes to include. This + // list does not contain partition columns + private List colsToInclude; + /** * Constructor for VectorizedRowBatchCtx * @@ -106,7 +111,7 @@ public VectorizedRowBatchCtx() { * @throws IllegalAccessException * @throws HiveException */ - public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, + public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, IOException, SerDeException, InstantiationException, @@ -158,7 +163,16 @@ public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx for (int i = 0; i < partKeys.length; i++) { String key = partKeys[i]; partNames.add(key); - partitionValues.put(key, partSpec.get(key)); + if (partSpec == null) { + // for partitionless table, initialize partValue to empty string. + // We can have partitionless table even if we have partition keys + // when there is only only partition selected and the partition key is not + // part of the projection/include list. + partitionValues.put(key, ""); + } else { + partitionValues.put(key, partSpec.get(key)); + } + partObjectInspectors .add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } @@ -179,6 +193,8 @@ public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx rowOI = partRawRowObjectInspector; rawRowOI = partRawRowObjectInspector; } + + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf); } /** @@ -187,48 +203,54 @@ public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx * @return VectorizedRowBatch * @throws HiveException */ - public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException + public VectorizedRowBatch createVectorizedRowBatch() throws HiveException { List fieldRefs = rowOI.getAllStructFieldRefs(); VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size()); for (int j = 0; j < fieldRefs.size(); j++) { - ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); - switch (foi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - // Vectorization currently only supports the following data types: - // BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING and TIMESTAMP - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case TIMESTAMP: - result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case FLOAT: - case DOUBLE: - result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + // If the column is included in the include list or if the column is a + // partition column then create the column vector. Also note that partition columns are not + // in the included list. + if ((colsToInclude == null) || colsToInclude.contains(j) + || ((partitionValues != null) && (partitionValues.get(fieldRefs.get(j).getFieldName()) != null))) { + ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); + switch (foi.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; + // Vectorization currently only supports the following data types: + // BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING and TIMESTAMP + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + break; + case FLOAT: + case DOUBLE: + result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + break; + case STRING: + result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + break; + default: + throw new RuntimeException("Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } break; - case STRING: - result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - default: - throw new RuntimeException("Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); } - break; - } - case LIST: - case MAP: - case STRUCT: - case UNION: - throw new HiveException("Vectorizaton is not supported for datatype:" - + foi.getCategory()); - default: - throw new HiveException("Unknown ObjectInspector category!"); + case LIST: + case MAP: + case STRUCT: + case UNION: + throw new HiveException("Vectorizaton is not supported for datatype:" + + foi.getCategory()); + default: + throw new HiveException("Unknown ObjectInspector category!"); + } } } result.numCols = fieldRefs.size(); @@ -247,7 +269,7 @@ public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException * @throws HiveException * @throws SerDeException */ - public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch) + public void addRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch) throws HiveException, SerDeException { Object row = this.deserializer.deserialize(rowBlob); @@ -263,7 +285,7 @@ public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch bat * Vectorized row batch which contains deserialized data * @throws SerDeException */ - public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, + public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, VectorizedRowBatch batch) throws SerDeException { @@ -275,7 +297,7 @@ public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, } } - private int GetColIndexBasedOnColName(String colName) throws HiveException + private int getColIndexBasedOnColName(String colName) throws HiveException { List fieldRefs = rowOI.getAllStructFieldRefs(); for (int i = 0; i < fieldRefs.size(); i++) { @@ -292,14 +314,14 @@ private int GetColIndexBasedOnColName(String colName) throws HiveException * @param batch * @throws HiveException */ - public void AddPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException + public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException { int colIndex; String value; BytesColumnVector bcv; if (partitionValues != null) { for (String key : partitionValues.keySet()) { - colIndex = GetColIndexBasedOnColName(key); + colIndex = getColIndexBasedOnColName(key); value = partitionValues.get(key); bcv = (BytesColumnVector) batch.cols[colIndex]; bcv.setRef(0, value.getBytes(), 0, value.length()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java index 5fb4b69..bc19fb4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java @@ -126,7 +126,7 @@ public VectorizedRCFileRecordReader(Configuration conf, FileSplit split) more = start < end; try { rbCtx = new VectorizedRowBatchCtx(); - rbCtx.Init(conf, split); + rbCtx.init(conf, split); } catch (Exception e) { throw new RuntimeException(e); } @@ -149,7 +149,7 @@ public NullWritable createKey() { public VectorizedRowBatch createValue() { VectorizedRowBatch result = null; try { - result = rbCtx.CreateVectorizedRowBatch(); + result = rbCtx.createVectorizedRowBatch(); } catch (HiveException e) { new RuntimeException("Error creating a batch", e); } @@ -181,13 +181,13 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti // CombineHiveRecordReader and as this does not call CreateValue() for // each new RecordReader it creates, this check is required in next() if (addPartitionCols) { - rbCtx.AddPartitionColsToBatch(value); + rbCtx.addPartitionColsToBatch(value); addPartitionCols = false; } in.getCurrentRow(colsCache); // Currently RCFile reader does not support reading vectorized // data. Populating the batch by adding one row at a time. - rbCtx.AddRowToBatch(i, (Writable) colsCache, value); + rbCtx.addRowToBatch(i, (Writable) colsCache, value); } else { break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 211b7e5..5684e77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -66,7 +66,7 @@ try { rbCtx = new VectorizedRowBatchCtx(); - rbCtx.Init(conf, fileSplit); + rbCtx.init(conf, fileSplit); } catch (Exception e) { throw new RuntimeException(e); } @@ -85,13 +85,13 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti // as this does not call CreateValue for each new RecordReader it creates, this check is // required in next() if (addPartitionCols) { - rbCtx.AddPartitionColsToBatch(value); + rbCtx.addPartitionColsToBatch(value); addPartitionCols = false; } reader.nextBatch(value); - rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object) value, value.size, value); + rbCtx.convertRowBatchBlobToVectorizedBatch((Object) value, value.size, value); } catch (Exception e) { - new RuntimeException(e); + throw new RuntimeException(e); } progress = reader.getProgress(); return true; @@ -106,9 +106,9 @@ public NullWritable createKey() { public VectorizedRowBatch createValue() { VectorizedRowBatch result = null; try { - result = rbCtx.CreateVectorizedRowBatch(); + result = rbCtx.createVectorizedRowBatch(); } catch (HiveException e) { - new RuntimeException("Error creating a batch", e); + throw new RuntimeException("Error creating a batch", e); } return result; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java index 78ebb17..9a56c52 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java @@ -203,7 +203,7 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I // Create the context VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null); - VectorizedRowBatch batch = ctx.CreateVectorizedRowBatch(); + VectorizedRowBatch batch = ctx.createVectorizedRowBatch(); VectorizedBatchUtil.SetNoNullFields(true, batch); // Iterate thru the rows and populate the batch @@ -213,7 +213,7 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I BytesRefArrayWritable cols = new BytesRefArrayWritable(); reader.getCurrentRow(cols); cols.resetValid(colCount); - ctx.AddRowToBatch(i, cols, batch); + ctx.addRowToBatch(i, cols, batch); } reader.close(); batch.size = 10;