diff --git common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java index a5d7399..fa7a257 100644 --- common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java +++ common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java @@ -90,7 +90,7 @@ * @serial * @see #getUnscaledValue() */ - private final UnsignedInt128 unscaledValue; + private UnsignedInt128 unscaledValue; /** * The scale of this Decimal128, as returned by {@link #getScale()}. Unlike @@ -731,8 +731,7 @@ public Decimal128 update(char[] str, int offset, int length, short scale) { /** * Serializes the value in a format compatible with the BigDecimal's own representation - * @param bytes - * @param offset + * @param scratch */ public int fastSerializeForHiveDecimal( Decimal128FastBuffer scratch) { return this.unscaledValue.fastSerializeForHiveDecimal(scratch, this.signum); @@ -1892,5 +1891,26 @@ public Decimal128 fastUpdateFromInternalStorage(byte[] internalStorage, short sc return this; } + + /** + * This setter is only for de-serialization, should not be used otherwise. + */ + public void setUnscaledValue(UnsignedInt128 unscaledValue) { + this.unscaledValue = unscaledValue; + } + + /** + * This setter is only for de-serialization, should not be used otherwise. + */ + public void setScale(short scale) { + this.scale = scale; + } + + /** + * This setter is only for de-serialization, should not be used otherwise. + */ + public void setSignum(byte signum) { + this.signum = signum; + } } diff --git common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java index 74168bd..34bd9d0 100644 --- common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java +++ common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java @@ -15,6 +15,7 @@ */ package org.apache.hadoop.hive.common.type; +import java.io.Serializable; import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.IntBuffer; @@ -35,7 +36,7 @@ * SQL (e.g., exact POWER/SQRT). * */ -public final class UnsignedInt128 implements Comparable { +public final class UnsignedInt128 implements Comparable, Serializable { /** Number of ints to store this object. */ public static final int INT_COUNT = 4; @@ -61,7 +62,7 @@ * Int32 elements as little-endian (v[0] is least significant) unsigned * integers. */ - private final int[] v = new int[INT_COUNT]; + private int[] v = new int[INT_COUNT]; /** * Number of leading non-zero elements in {@link #v}. For example, if the @@ -70,7 +71,7 @@ * * @see #updateCount() */ - private transient byte count; + private byte count; /** * Determines the number of ints to store one value. @@ -2418,7 +2419,7 @@ private void multiplyDestructiveFitsInt32(UnsignedInt128 right, } } - /** Updates the value of {@link #cnt} by checking {@link #v}. */ + /** Updates the value of {@link #count} by checking {@link #v}. */ private void updateCount() { if (v[3] != 0) { this.count = (byte) 4; @@ -2634,4 +2635,30 @@ private int fastUpdateIntFromInternalStorage(byte[] internalStorage, } return value; } + + public int[] getV() { + return v; + } + + /** + * This setter is only for de-serialization, should not be used otherwise. + */ + public void setV(int [] v) { + this.v[0] = v[0]; + this.v[1] = v[1]; + this.v[2] = v[2]; + this.v[3] = v[3]; + updateCount(); + } + + public byte getCount() { + return count; + } + + /** + * This setter is only for de-serialization, should not be used otherwise. + */ + public void setCount(byte count) { + this.count = count; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index edbbbec..ab780db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -20,7 +20,9 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; @@ -43,6 +45,10 @@ private static final long serialVersionUID = 1L; + protected transient Object[] singleRow; + + protected transient VectorExpressionWriter[] valueWriters; + public VectorFileSinkOperator(VectorizationContext context, OperatorDesc conf) { super(); @@ -54,35 +60,44 @@ public VectorFileSinkOperator() { } @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + valueWriters = VectorExpressionWriterFactory.getExpressionWriters( + (StructObjectInspector) inputObjInspectors[0]); + singleRow = new Object[valueWriters.length]; + } + + @Override public void processOp(Object data, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch)data; Writable [] records = null; boolean vectorizedSerde = false; - int outputIterations = 1; try { if (serializer instanceof VectorizedSerde) { recordValue = ((VectorizedSerde) serializer).serializeVector(vrg, inputObjInspectors[0]); records = (Writable[]) ((ObjectWritable) recordValue).get(); vectorizedSerde = true; - outputIterations = vrg.size; } } catch (SerDeException e1) { throw new HiveException(e1); } - for (int i = 0; i < outputIterations; i++) { + for (int i = 0; i < vrg.size; i++) { Writable row = null; if (vectorizedSerde) { row = records[i]; } else { if (vrg.valueWriters == null) { - vrg.setValueWriters(VectorExpressionWriterFactory.getExpressionWriters( - (StructObjectInspector)inputObjInspectors[0])); + vrg.setValueWriters(this.valueWriters); + } + try { + row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]); + } catch (SerDeException ex) { + throw new HiveException(ex); } - row = new Text(vrg.toString()); } /* Create list bucketing sub-directory only if stored-as-directories is on. */ String lbDirName = null; @@ -159,4 +174,13 @@ public void processOp(Object data, int tag) throws HiveException { } } } + + private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex) + throws HiveException { + for (int i = 0; i < vrg.projectionSize; i++) { + ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; + singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, rowIndex); + } + return singleRow; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java index 868f13e..886154d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java @@ -1045,7 +1045,7 @@ public VectorExpressionWriterSetter init( @Override public Object writeValue(ColumnVector column, int row) throws HiveException { - throw new HiveException("Should never reach here"); + return baseWriter.writeValue(column, row); } @Override