diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 2acd842..ad3ead3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -448,14 +447,13 @@ private boolean processVectors(Iterable values, byte tag) throws HiveExc int rowIdx = 0; try { + VectorizedBatchUtil.BatchAccessor accessor = + VectorizedBatchUtil.build(valueStructInspectors[tag], batch, buffer, keysColumnOffset); for (Object value : values) { /* deserialize value into columns */ BytesWritable valueWritable = (BytesWritable) value; Object valueObj = deserializeValue(valueWritable, tag); - - VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], - rowIdx, keysColumnOffset, batch, buffer); - rowIdx++; + accessor.visit(valueObj, rowIdx++); if (rowIdx >= BATCH_SIZE) { VectorizedBatchUtil.setBatchSize(batch, rowIdx); reducer.processOp(batch, tag); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 16454e7..8b47537 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.sql.Date; import java.sql.Timestamp; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -204,6 +207,299 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer); } + public static BatchAccessor build(StructObjectInspector oi, VectorizedRowBatch vector, + DataOutputBuffer buffer, int colOffset) throws HiveException { + return new BatchAccessor(oi, vector, buffer, colOffset); + } + + public static final class BatchAccessor { + private final ColumnVector[] cols; + private final FieldAccessor[] accessor; + private final StructObjectInspector oi; + private final DataOutputBuffer buffer; + BatchAccessor(StructObjectInspector oi, VectorizedRowBatch vector, DataOutputBuffer buffer, int colOffset) + throws HiveException { + List fieldRefs = oi.getAllStructFieldRefs(); + accessor = new FieldAccessor[fieldRefs.size()]; + for (int i = 0; i < fieldRefs.size(); i++) { + ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector(); + // Vectorization only supports PRIMITIVE data types. Assert the same + if (foi.getCategory() != Category.PRIMITIVE) { + throw new HiveException("Vectorizaton is not supported for datatype:" + + foi.getTypeName()); + } + // Get writable object + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: + accessor[i] = new BooleanAccessor(poi, colOffset + i); + break; + case BYTE: + accessor[i] = new ByteAccessor(poi, colOffset + i); + break; + case SHORT: + accessor[i] = new ShortAccessor(poi, colOffset + i); + break; + case INT: + accessor[i] = new IntAccessor(poi, colOffset + i); + break; + case LONG: + accessor[i] = new LongAccessor(poi, colOffset + i); + break; + case DATE: + accessor[i] = new DateAccessor(poi, colOffset + i); + break; + case FLOAT: + accessor[i] = new FloatAccessor(poi, colOffset + i); + break; + case DOUBLE: + accessor[i] = new DoubleAccessor(poi, colOffset + i); + break; + case TIMESTAMP: + accessor[i] = new TimestampAccessor(poi, colOffset + i); + break; + case BINARY: + accessor[i] = new BinaryAccessor(poi, colOffset + i); + break; + case STRING: + accessor[i] = new StringAccessor(poi, colOffset + i); + break; + default: + throw new HiveException("Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } + } + this.oi = oi; + this.cols = vector.cols; + this.buffer = buffer; + } + + public void visit(Object row, int index) { + List fieldData = oi.getStructFieldsDataAsList(row); + for (int i = 0; i < accessor.length; i++) { + accessor[i].visit(fieldData.get(i), cols[i], buffer, index); + } + } + } + + private abstract static class FieldAccessor { + protected final PrimitiveObjectInspector poi; + protected final boolean preferWritable; + protected final int colOffset; + protected FieldAccessor(PrimitiveObjectInspector poi, int colOffset) { + this.poi = poi; + this.preferWritable = poi.preferWritable(); + this.colOffset = colOffset; + } + public void visit(Object field, ColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + Object value = preferWritable ? + poi.getPrimitiveWritableObject(field) : poi.getPrimitiveJavaObject(field); + if (value == null) { + setNullColIsNullValue(vector, rowIndex); + } else { + setValue(value, (T)vector, buffer, rowIndex); + } + } + + protected abstract void setValue(Object value, T vector, DataOutputBuffer buffer, int rowIndex); + } + + private static class BooleanAccessor extends FieldAccessor { + protected BooleanAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((BooleanWritable)value).get() ? 1 : 0; + } else { + vector.vector[rowIndex] = (Boolean)value ? 1 : 0; + } + } + } + + private static class ByteAccessor extends FieldAccessor { + protected ByteAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((ByteWritable)value).get(); + } else { + vector.vector[rowIndex] = (Byte)value; + } + } + } + + private static class ShortAccessor extends FieldAccessor { + protected ShortAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((ShortWritable)value).get(); + } else { + vector.vector[rowIndex] = (Short)value; + } + } + } + + private static class IntAccessor extends FieldAccessor { + protected IntAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((IntWritable)value).get(); + } else { + vector.vector[rowIndex] = (Integer)value; + } + } + } + + private static class LongAccessor extends FieldAccessor { + protected LongAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((LongWritable)value).get(); + } else { + vector.vector[rowIndex] = (Long)value; + } + } + } + + private static class DateAccessor extends FieldAccessor { + protected DateAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((DateWritable)value).getDays(); + } else { + vector.vector[rowIndex] = DateWritable.dateToDays((Date) value); + } + } + } + + private static class FloatAccessor extends FieldAccessor { + protected FloatAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, DoubleColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((FloatWritable)value).get(); + } else { + vector.vector[rowIndex] = (Float)value; + } + } + } + + private static class DoubleAccessor extends FieldAccessor { + protected DoubleAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, DoubleColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + vector.vector[rowIndex] = ((DoubleWritable)value).get(); + } else { + vector.vector[rowIndex] = (Double)value; + } + } + } + + private static class TimestampAccessor extends FieldAccessor { + protected TimestampAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + Timestamp t = preferWritable ? + ((TimestampWritable) value).getTimestamp() : (Timestamp) value; + vector.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t); + } + } + + private static class DecimalAccessor extends FieldAccessor { + protected DecimalAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, DecimalColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + if (preferWritable) { + HiveDecimalWritable wobj = (HiveDecimalWritable) value; + vector.vector[rowIndex].update(wobj.getHiveDecimal().unscaledValue(), (short) wobj.getScale()); + } else { + HiveDecimal decimal = (HiveDecimal) value; + vector.vector[rowIndex].update(decimal.unscaledValue(), (short) decimal.scale()); + } + } + } + + private static class BinaryAccessor extends FieldAccessor { + protected BinaryAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, BytesColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + int start = buffer.getLength(); + byte[] bytes; + int length; + if (preferWritable) { + BytesWritable bw = (BytesWritable) value; + bytes = bw.getBytes(); + length = bw.getLength(); + } else { + bytes = (byte[]) value; + length = bytes.length; + } + try { + buffer.write(bytes, 0, length); + } catch (IOException ioe) { + throw new IllegalStateException("bad write", ioe); + } + vector.setRef(rowIndex, buffer.getData(), start, length); + } + } + private static class StringAccessor extends FieldAccessor { + protected StringAccessor(PrimitiveObjectInspector poi, int colOffset) { + super(poi, colOffset); + } + @Override + protected void setValue(Object value, BytesColumnVector vector, DataOutputBuffer buffer, int rowIndex) { + int start = buffer.getLength(); + byte[] bytes; + int length; + if (preferWritable) { + Text bw = (Text) value; + bytes = bw.getBytes(); + length = bw.getLength(); + } else { + try { + bytes = Text.encode((String) value).array(); + } catch (CharacterCodingException e) { + throw new IllegalStateException("bad encoding", e); + } + length = bytes.length; + } + try { + buffer.write(bytes, 0, length); + } catch (IOException ioe) { + throw new IllegalStateException("bad write", ioe); + } + vector.setRef(rowIndex, buffer.getData(), start, length); + } + } + /** * Iterates thru all the columns in a given row and populates the batch * from a given offset