diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 2acd842..578210d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -59,7 +59,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -104,9 +103,12 @@ private DataOutputBuffer buffer; private VectorizedRowBatch[] batches; + private VectorizedBatchUtil.BatchAccessor[] accessors; // number of columns pertaining to keys in a vectorized row batch private int keysColumnOffset; private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; + private final Object[] convey = new Object[BATCH_SIZE]; + private StructObjectInspector keyStructInspector; private StructObjectInspector[] valueStructInspectors; /* this is only used in the error code path */ @@ -151,6 +153,7 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr final int maxTags = redWork.getTagToValueDesc().size(); keyStructInspector = (StructObjectInspector)keyObjectInspector; batches = new VectorizedRowBatch[maxTags]; + accessors = new VectorizedBatchUtil.BatchAccessor[maxTags]; valueStructInspectors = new StructObjectInspector[maxTags]; valueStringWriters = (List[])new List[maxTags]; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); @@ -175,6 +178,8 @@ void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mr batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors[tag]); + accessors[tag] = VectorizedBatchUtil.build(valueStructInspectors[tag], + batches[tag], buffer, keysColumnOffset); final int totalColumns = keysColumnOffset + valueStructInspectors[tag].getAllStructFieldRefs().size(); valueStringWriters[tag] = new ArrayList(totalColumns); @@ -385,15 +390,19 @@ private boolean processRows(Object key, Iterable values) { } private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { + return deserializeValue(valueWritable, tag); + } + + private Object deserializeValue(BytesWritable valueWritable, SerDe serDe, byte tag) throws HiveException { try { - return inputValueDeserializer[tag].deserialize(valueWritable); + return serDe.deserialize(valueWritable); } catch (SerDeException e) { throw new HiveException( "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + tag + ") from " + Utilities.formatBinaryString(valueWritable.getBytes(), 0, - valueWritable.getLength()) + " with properties " + valueWritable.getLength()) + " with properties " + valueTableDesc[tag].getProperties(), e); } } @@ -431,6 +440,14 @@ private boolean processKeyValues(Iterable values, byte tag) throws HiveE return true; //give me more } + private int deserialize(Iterator values, SerDe serDe, byte tag) throws HiveException { + int index = 0; + while (values.hasNext()) { + convey[index++] = deserializeValue((BytesWritable)values.next(), serDe, tag); + } + return index; + } + /** * @param values * @return true if it is not done and can take more inputs @@ -447,30 +464,20 @@ private boolean processVectors(Iterable values, byte tag) throws HiveExc } int rowIdx = 0; + Iterator iterator = values.iterator(); try { - for (Object value : values) { - /* deserialize value into columns */ - BytesWritable valueWritable = (BytesWritable) value; - Object valueObj = deserializeValue(valueWritable, tag); - - VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], - rowIdx, keysColumnOffset, batch, buffer); - rowIdx++; - if (rowIdx >= BATCH_SIZE) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); - rowIdx = 0; - if (isLogInfoEnabled) { - logProgress(); - } + while (iterator.hasNext()) { + int length = deserialize(iterator, inputValueDeserializer[tag], tag); + for (int i = 0; i < length; i++) { + accessors[tag].visit(convey[i], rowIdx++); } - } - if (rowIdx > 0) { +// accessors[tag].visit(Arrays.asList(convey)); VectorizedBatchUtil.setBatchSize(batch, rowIdx); reducer.processOp(batch, tag); - } - if (isLogInfoEnabled) { - logProgress(); + rowIdx = 0; + if (isLogInfoEnabled) { + logProgress(); + } } } catch (Exception e) { String rowString = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 16454e7..30317fd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; +import java.sql.Date; import java.sql.Timestamp; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -204,12 +207,294 @@ public static void addRowToBatch(Object row, StructObjectInspector oi, addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer); } + public static BatchAccessor build(StructObjectInspector oi, VectorizedRowBatch vector, + DataOutputBuffer buffer, int colOffset) throws HiveException { + return new BatchAccessor(oi, vector, buffer, colOffset); + } + + public static final class BatchAccessor { + + private final ColumnVector[] cols; + private final FieldAccessor[] accessor; + private final StructObjectInspector oi; + private final StructField[] fields; + + BatchAccessor(StructObjectInspector oi, VectorizedRowBatch vector, DataOutputBuffer buffer, int colOffset) + throws HiveException { + List fieldRefs = oi.getAllStructFieldRefs(); + accessor = new FieldAccessor[fieldRefs.size()]; + for (int i = 0; i < fieldRefs.size(); i++) { + ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector(); + // Vectorization only supports PRIMITIVE data types. Assert the same + if (foi.getCategory() != Category.PRIMITIVE) { + throw new HiveException("Vectorizaton is not supported for datatype:" + + foi.getTypeName()); + } + // Get writable object + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: + accessor[i] = new BooleanAccessor(poi, buffer, colOffset + i); + break; + case BYTE: + accessor[i] = new ByteAccessor(poi, buffer, colOffset + i); + break; + case SHORT: + accessor[i] = new ShortAccessor(poi, buffer, colOffset + i); + break; + case INT: + accessor[i] = new IntAccessor(poi, buffer, colOffset + i); + break; + case LONG: + accessor[i] = new LongAccessor(poi, buffer, colOffset + i); + break; + case DATE: + accessor[i] = new DateAccessor(poi, buffer, colOffset + i); + break; + case FLOAT: + accessor[i] = new FloatAccessor(poi, buffer, colOffset + i); + break; + case DOUBLE: + accessor[i] = new DoubleAccessor(poi, buffer, colOffset + i); + break; + case TIMESTAMP: + accessor[i] = new TimestampAccessor(poi, buffer, colOffset + i); + break; + case BINARY: + accessor[i] = new BinaryAccessor(poi, buffer, colOffset + i); + break; + case STRING: + accessor[i] = new StringAccessor(poi, buffer, colOffset + i); + break; + case DECIMAL: + accessor[i] = new DecimalAccessor(poi, buffer, colOffset + i); + break; + default: + throw new HiveException("Vectorizaton is not supported for datatype:" + + poi.getPrimitiveCategory()); + } + } + this.oi = oi; + this.cols = vector.cols; + this.fields = fieldRefs.toArray(new StructField[fieldRefs.size()]); + } + + // by row + public void visit(Object row, int rowIndex) throws Exception { + for (int i = 0; i < cols.length; i++) { + Object columnValue = oi.getStructFieldData(row, fields[i]); + accessor[i].visit(columnValue, cols[i], rowIndex); + } + } + + // by column + public void visit(Iterable rows) throws Exception { + for (int i = 0; i < cols.length; i++) { + visit(rows.iterator(), i); + } + } + + public void visit(Iterator rows, int colIndex) throws Exception { + int rowIndex = 0; + while (rows.hasNext()) { + Object columnValue = oi.getStructFieldData(rows.next(), fields[colIndex]); + accessor[colIndex].visit(columnValue, cols[colIndex], rowIndex++); + } + } + } + + private abstract static class FieldAccessor { + + protected final PrimitiveObjectInspector poi; + protected final DataOutputBuffer buffer; + protected final int colOffset; + protected final boolean preferWritable; + + protected FieldAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + this.poi = poi; + this.buffer = buffer; + this.colOffset = colOffset; + this.preferWritable = poi.preferWritable(); + } + + public void visit(Object field, ColumnVector vector, int rowIndex) + throws Exception { + Object value = preferWritable ? + poi.getPrimitiveWritableObject(field) : poi.getPrimitiveJavaObject(field); + if (value == null) { + setNullColIsNullValue(vector, rowIndex); + } else { + setValue(value, (T)vector, rowIndex); + } + } + + protected abstract void setValue(Object value, T vector, int rowIndex) + throws Exception; + } + + private static class BooleanAccessor extends FieldAccessor { + BooleanAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = + (preferWritable ? ((BooleanWritable)value).get() : (Boolean)value) ? 1 : 0; + } + } + + private static class ByteAccessor extends FieldAccessor { + ByteAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? ((ByteWritable)value).get() : (Byte)value; + } + } + + private static class ShortAccessor extends FieldAccessor { + ShortAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? ((ShortWritable)value).get() : (Short)value; + } + } + + private static class IntAccessor extends FieldAccessor { + IntAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? ((IntWritable)value).get() : (Integer)value; + } + } + + private static class LongAccessor extends FieldAccessor { + LongAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? ((LongWritable)value).get() : (Long)value; + } + } + + private static class DateAccessor extends FieldAccessor { + DateAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? + ((DateWritable)value).getDays() : DateWritable.dateToDays((Date) value); + } + } + + private static class FloatAccessor extends FieldAccessor { + FloatAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, DoubleColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? ((FloatWritable)value).get() : (Float)value; + } + } + + private static class DoubleAccessor extends FieldAccessor { + DoubleAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, DoubleColumnVector vector, int rowIndex) { + vector.vector[rowIndex] = preferWritable ? ((DoubleWritable)value).get() : (Double)value; + } + } + + private static class TimestampAccessor extends FieldAccessor { + TimestampAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, LongColumnVector vector, int rowIndex) { + Timestamp t = preferWritable ? + ((TimestampWritable) value).getTimestamp() : (Timestamp) value; + vector.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t); + } + } + + private static class DecimalAccessor extends FieldAccessor { + DecimalAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, DecimalColumnVector vector, int rowIndex) { + if (preferWritable) { + HiveDecimalWritable wobj = (HiveDecimalWritable) value; + vector.vector[rowIndex].update(wobj.getHiveDecimal().unscaledValue(), (short) wobj.getScale()); + } else { + HiveDecimal decimal = (HiveDecimal) value; + vector.vector[rowIndex].update(decimal.unscaledValue(), (short) decimal.scale()); + } + } + } + + private static class BinaryAccessor extends FieldAccessor { + BinaryAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, BytesColumnVector vector, int rowIndex) + throws IOException { + int start = buffer.getLength(); + byte[] bytes; + int length; + if (preferWritable) { + BytesWritable bw = (BytesWritable) value; + bytes = bw.getBytes(); + length = bw.getLength(); + } else { + bytes = (byte[]) value; + length = bytes.length; + } + buffer.write(bytes, 0, length); + vector.setRef(rowIndex, buffer.getData(), start, length); + } + } + + private static class StringAccessor extends FieldAccessor { + StringAccessor(PrimitiveObjectInspector poi, DataOutputBuffer buffer, int colOffset) { + super(poi, buffer, colOffset); + } + @Override + protected void setValue(Object value, BytesColumnVector vector, int rowIndex) + throws IOException { + int start = buffer.getLength(); + byte[] bytes; + int length; + if (preferWritable) { + Text bw = (Text) value; + bytes = bw.getBytes(); + length = bw.getLength(); + } else { + bytes = Text.encode((String) value).array(); + length = bytes.length; + } + buffer.write(bytes, 0, length); + vector.setRef(rowIndex, buffer.getData(), start, length); + } + } + /** * Iterates thru all the columns in a given row and populates the batch * from a given offset * * @param row Deserialized row object - * @param oi Object insepector for that row + * @param oi Object inspector for that row * @param rowIndex index to which the row should be added to batch * @param colOffset offset from where the column begins * @param batch Vectorized batch to which the row is added at rowIndex