diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 217a40c..1577827 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -55,6 +55,7 @@ // used to log memory usage periodically public static MemoryMXBean memoryMXBean; protected boolean isLogInfoEnabled = false; + protected boolean isLogTraceEnabled = false; protected MRTaskReporter reporter; private long numRows = 0; @@ -85,6 +86,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); isLogInfoEnabled = l4j.isInfoEnabled(); + isLogTraceEnabled = l4j.isTraceEnabled(); //log classpaths try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index b5607a4..7fc5c90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -38,6 +39,10 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -49,7 +54,10 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; @@ -61,8 +69,6 @@ import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValuesReader; -import com.google.common.collect.Lists; - /** * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. @@ -92,8 +98,20 @@ private ReduceWork redWork; + private boolean vectorized = false; + List row = new ArrayList(Utilities.reduceFieldNameList.size()); + private DataOutputBuffer buffer; + private VectorizedRowBatch[] batches; + // number of columns pertaining to keys in a vectorized row batch + private int keysColumnOffset; + private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; + private StructObjectInspector keyStructInspector; + private StructObjectInspector[] valueStructInspectors; + /* this is only used in the error code path */ + private List[] valueStringWriters; + @Override void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { @@ -118,6 +136,8 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr reducer.setParentOperators(null); // clear out any parents as reducer is the // root isTagged = redWork.getNeedsTagging(); + vectorized = redWork.getVectorModeOn() != null; + try { keyTableDesc = redWork.getKeyDesc(); inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc @@ -126,6 +146,17 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr keyObjectInspector = inputKeyDeserializer.getObjectInspector(); reducer.setGroupKeyObjectInspector(keyObjectInspector); valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()]; + + if(vectorized) { + final int maxTags = redWork.getTagToValueDesc().size(); + keyStructInspector = (StructObjectInspector)keyObjectInspector; + batches = new VectorizedRowBatch[maxTags]; + valueStructInspectors = new StructObjectInspector[maxTags]; + valueStringWriters = (List[])new List[maxTags]; + keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); + buffer = new DataOutputBuffer(); + } + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { // We should initialize the SerDe with the TypeInfo when available. valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag); @@ -141,6 +172,23 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr ois.add(valueObjectInspector[tag]); rowObjectInspector[tag] = ObjectInspectorFactory .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); + + if(vectorized) { + /* vectorization only works with struct object inspectors */ + valueStructInspectors[tag] = (StructObjectInspector)valueObjectInspector[tag]; + + batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, + valueStructInspectors[tag]); + final int totalColumns = keysColumnOffset + + valueStructInspectors[tag].getAllStructFieldRefs().size(); + valueStringWriters[tag] = new ArrayList(totalColumns); + valueStringWriters[tag].addAll(Arrays + .asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(keyStructInspector))); + valueStringWriters[tag].addAll(Arrays + .asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(valueStructInspectors[tag]))); + } } } catch (Exception e) { throw new RuntimeException(e); @@ -220,7 +268,7 @@ void run() throws Exception { while(kvsReader.next()){ Object key = kvsReader.getCurrentKey(); Iterable values = kvsReader.getCurrentValues(); - boolean needMore = processKeyValues(key, values); + boolean needMore = processRows(key, values); if(!needMore){ break; } @@ -248,7 +296,7 @@ void run() throws Exception { * @param values * @return true if it is not done and can take more inputs */ - private boolean processKeyValues(Object key, Iterable values) { + private boolean processRows(Object key, Iterable values) { if(reducer.getDone()){ //done - no more records needed return false; @@ -259,81 +307,51 @@ private boolean processKeyValues(Object key, Iterable values) { try { BytesWritable keyWritable = (BytesWritable) key; - byte tag = 0; + if (isTagged) { // remove the tag from key coming out of reducer // and store it in separate variable. - int size = keyWritable.getSize() - 1; - tag = keyWritable.get()[size]; + int size = keyWritable.getLength() - 1; + tag = keyWritable.getBytes()[size]; keyWritable.setSize(size); } //Set the key, check if this is a new group or same group - if (!keyWritable.equals(groupKey)) { + if (!keyWritable.equals(this.groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group - groupKey = new BytesWritable(); + this.groupKey = new BytesWritable(); } else { // If a operator wants to do some work at the end of a group - l4j.trace("End Group"); + if(isLogTraceEnabled) { + l4j.trace("End Group"); + } reducer.endGroup(); } try { - keyObject = inputKeyDeserializer.deserialize(keyWritable); + this.keyObject = inputKeyDeserializer.deserialize(keyWritable); } catch (Exception e) { throw new HiveException( "Hive Runtime Error: Unable to deserialize reduce input key from " - + Utilities.formatBinaryString(keyWritable.get(), 0, - keyWritable.getSize()) + " with properties " + + Utilities.formatBinaryString(keyWritable.getBytes(), 0, + keyWritable.getLength()) + " with properties " + keyTableDesc.getProperties(), e); } - - groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); - l4j.trace("Start Group"); + groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + if (isLogTraceEnabled) { + l4j.trace("Start Group"); + } reducer.setGroupKeyObject(keyObject); reducer.startGroup(); } - - //process all the values we have for this key - Iterator valuesIt = values.iterator(); - while (valuesIt.hasNext()) { - BytesWritable valueWritable = (BytesWritable) valuesIt.next(); - Object valueObj; - try { - valueObj = inputValueDeserializer[tag].deserialize(valueWritable); - } catch (SerDeException e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input value (tag=" - + tag - + ") from " - + Utilities.formatBinaryString(valueWritable.get(), 0, - valueWritable.getSize()) + " with properties " - + valueTableDesc[tag].getProperties(), e); - } - row.clear(); - row.add(keyObject); - row.add(valueObj); - - try { - reducer.processOp(row, tag); - } catch (Exception e) { - String rowString = null; - try { - rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " + - StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing row (tag=" - + tag + ") " + rowString, e); - } - if (isLogInfoEnabled) { - logProgress(); - } + /* this.keyObject passed via reference */ + if(vectorized) { + return processVectors(values, tag); + } else { + return processKeyValues(values, tag); } - } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -344,9 +362,113 @@ private boolean processKeyValues(Object key, Iterable values) { throw new RuntimeException(e); } } + } + + private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { + try { + return inputValueDeserializer[tag].deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.getBytes(), 0, + valueWritable.getLength()) + " with properties " + + valueTableDesc[tag].getProperties(), e); + } + } + + /** + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processKeyValues(Iterable values, byte tag) throws HiveException { + + for (Object value : values) { + BytesWritable valueWritable = (BytesWritable) value; + + row.clear(); + row.add(this.keyObject); + row.add(deserializeValue(valueWritable, tag)); + + try { + reducer.processOp(row, tag); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); + } + if (isLogInfoEnabled) { + logProgress(); + } + } return true; //give me more } + /** + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processVectors(Iterable values, byte tag) throws HiveException { + VectorizedRowBatch batch = batches[tag]; + batch.reset(); + + /* deserialize key into columns */ + VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector, + 0, 0, batch, buffer); + for(int i = 0; i < keysColumnOffset; i++) { + VectorizedBatchUtil.setRepeatingColumn(batch, i); + } + + int rowIdx = 0; + try { + for (Object value : values) { + /* deserialize value into columns */ + BytesWritable valueWritable = (BytesWritable) value; + Object valueObj = deserializeValue(valueWritable, tag); + + VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], + rowIdx, keysColumnOffset, batch, buffer); + rowIdx++; + if (rowIdx >= BATCH_SIZE) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + rowIdx = 0; + if (isLogInfoEnabled) { + logProgress(); + } + } + } + if (rowIdx > 0) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + } + if (isLogInfoEnabled) { + logProgress(); + } + } catch (Exception e) { + String rowString = null; + try { + /* batch.toString depends on this */ + batch.setValueWriters(valueStringWriters[tag] + .toArray(new VectorExpressionWriter[0])); + rowString = batch.toString(); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + + tag + ") " + rowString, e); + } + return true; // give me more + } + @Override void close(){ // check if there are IOExceptions @@ -357,7 +479,9 @@ void close(){ try { if (groupKey != null) { // If a operator wants to do some work at the end of a group - l4j.trace("End Group"); + if(isLogTraceEnabled) { + l4j.trace("End Group"); + } reducer.endGroup(); } if (isLogInfoEnabled) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 8c299a6..064b94e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.sql.Timestamp; +import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.FloatWritable; @@ -69,19 +71,156 @@ public static void setNoNullFields(VectorizedRowBatch batch) { } /** + * Iterates thru all the column vectors and sets repeating to + * specified column. + * + */ + public static void setRepeatingColumn(VectorizedRowBatch batch, int column) { + ColumnVector cv = batch.cols[column]; + cv.isRepeating = true; + } + + /** + * Reduce the batch size for a vectorized row batch + */ + public static void setBatchSize(VectorizedRowBatch batch, int size) { + assert (size <= batch.getMaxSize()); + batch.size = size; + } + + /** + * Walk through the object inspector and add column vectors + * + * @param oi + * @param cvList + * ColumnVectors are populated in this list + */ + private static void allocateColumnVector(StructObjectInspector oi, + List cvList) throws HiveException { + if (cvList == null) { + throw new HiveException("Null columnvector list"); + } + if (oi == null) { + return; + } + final List fields = oi.getAllStructFieldRefs(); + for(StructField field : fields) { + ObjectInspector fieldObjectInspector = field.getFieldObjectInspector(); + switch(fieldObjectInspector.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector; + switch(poi.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + case DATE: + cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); + break; + case FLOAT: + case DOUBLE: + cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); + break; + case STRING: + cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); + break; + case DECIMAL: + DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo(); + cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + tInfo.precision(), tInfo.scale())); + break; + default: + throw new HiveException("Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } + break; + case STRUCT: + throw new HiveException("Struct not supported"); + default: + throw new HiveException("Flattening is not supported for datatype:" + + fieldObjectInspector.getCategory()); + } + } + } + + + /** + * Create VectorizedRowBatch from ObjectInspector + * + * @param oi + * @return + * @throws HiveException + */ + public static VectorizedRowBatch constructVectorizedRowBatch( + StructObjectInspector oi) throws HiveException { + final List cvList = new LinkedList(); + allocateColumnVector(oi, cvList); + final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size()); + int i = 0; + for(ColumnVector cv : cvList) { + result.cols[i++] = cv; + } + return result; + } + + /** + * Create VectorizedRowBatch from key and value object inspectors + * + * @param keyInspector + * @param valueInspector + * @return VectorizedRowBatch + * @throws HiveException + */ + public static VectorizedRowBatch constructVectorizedRowBatch( + StructObjectInspector keyInspector, StructObjectInspector valueInspector) + throws HiveException { + final List cvList = new LinkedList(); + allocateColumnVector(keyInspector, cvList); + allocateColumnVector(valueInspector, cvList); + final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size()); + result.cols = cvList.toArray(result.cols); + return result; + } + + /** + * Iterates through all columns in a given row and populates the batch + * + * @param row + * @param oi + * @param rowIndex + * @param batch + * @param buffer + * @throws HiveException + */ + public static void addRowToBatch(Object row, StructObjectInspector oi, + int rowIndex, + VectorizedRowBatch batch, + DataOutputBuffer buffer + ) throws HiveException { + addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer); + } + + /** * Iterates thru all the columns in a given row and populates the batch + * from a given offset + * * @param row Deserialized row object * @param oi Object insepector for that row * @param rowIndex index to which the row should be added to batch + * @param colOffset offset from where the column begins * @param batch Vectorized batch to which the row is added at rowIndex * @throws HiveException */ - public static void addRowToBatch(Object row, StructObjectInspector oi, + public static void addRowToBatchFrom(Object row, StructObjectInspector oi, int rowIndex, + int colOffset, VectorizedRowBatch batch, DataOutputBuffer buffer ) throws HiveException { List fieldRefs = oi.getAllStructFieldRefs(); + final int off = colOffset; // Iterate thru the cols and load the batch for (int i = 0; i < fieldRefs.size(); i++) { Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); @@ -98,7 +237,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, // float/double. String types have no default value for null. switch (poi.getPrimitiveCategory()) { case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { lcv.vector[rowIndex] = ((BooleanWritable) writableCol).get() ? 1 : 0; lcv.isNull[rowIndex] = false; @@ -109,7 +248,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case BYTE: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { lcv.vector[rowIndex] = ((ByteWritable) writableCol).get(); lcv.isNull[rowIndex] = false; @@ -120,7 +259,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case SHORT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { lcv.vector[rowIndex] = ((ShortWritable) writableCol).get(); lcv.isNull[rowIndex] = false; @@ -131,7 +270,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case INT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { lcv.vector[rowIndex] = ((IntWritable) writableCol).get(); lcv.isNull[rowIndex] = false; @@ -142,7 +281,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case LONG: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { lcv.vector[rowIndex] = ((LongWritable) writableCol).get(); lcv.isNull[rowIndex] = false; @@ -153,7 +292,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case DATE: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { lcv.vector[rowIndex] = ((DateWritable) writableCol).getDays(); lcv.isNull[rowIndex] = false; @@ -164,7 +303,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case FLOAT: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i]; + DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[off+i]; if (writableCol != null) { dcv.vector[rowIndex] = ((FloatWritable) writableCol).get(); dcv.isNull[rowIndex] = false; @@ -175,7 +314,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case DOUBLE: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i]; + DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[off+i]; if (writableCol != null) { dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get(); dcv.isNull[rowIndex] = false; @@ -186,7 +325,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case TIMESTAMP: { - LongColumnVector lcv = (LongColumnVector) batch.cols[i]; + LongColumnVector lcv = (LongColumnVector) batch.cols[off+i]; if (writableCol != null) { Timestamp t = ((TimestampWritable) writableCol).getTimestamp(); lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t); @@ -198,7 +337,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case STRING: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[i]; + BytesColumnVector bcv = (BytesColumnVector) batch.cols[off+i]; if (writableCol != null) { bcv.isNull[rowIndex] = false; Text colText = (Text) writableCol; @@ -216,7 +355,7 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } break; case DECIMAL: - DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[i]; + DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[off+i]; if (writableCol != null) { dcv.isNull[rowIndex] = false; HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol; @@ -234,3 +373,4 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, } } + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java index ae65c30..d49136f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java @@ -443,6 +443,21 @@ public static VectorExpressionWriter genVectorExpressionWritable( } } + /** + * Compiles the appropriate vector expression writers based on a struct object + * inspector. + */ + public static VectorExpressionWriter[] genVectorStructExpressionWritables( + StructObjectInspector oi) throws HiveException { + VectorExpressionWriter[] writers = new VectorExpressionWriter[oi.getAllStructFieldRefs().size()]; + final List fields = oi.getAllStructFieldRefs(); + int i = 0; + for(StructField field : fields) { + writers[i++] = genVectorExpressionWritable(field.getFieldObjectInspector()); + } + return writers; + } + private static VectorExpressionWriter genVectorExpressionWritableDecimal( SettableHiveDecimalObjectInspector fieldObjInspector) throws HiveException {