diff --git common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java index 076fdc0..0743a80 100644 --- common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java +++ common/src/java/org/apache/hadoop/hive/common/type/Decimal128.java @@ -16,7 +16,7 @@ package org.apache.hadoop.hive.common.type; import java.math.BigDecimal; -import java.math.MathContext; +import java.math.BigInteger; import java.nio.IntBuffer; /** @@ -551,6 +551,25 @@ public void update32(int[] array, int offset) { } /** + * Updates the value of this object with the given {@link BigInteger} and scale. + * + * @param bigInt + * {@link java.math.BigInteger} + * @param scale + */ + public void update(BigInteger bigInt, short scale) { + this.scale = scale; + this.signum = (byte) bigInt.compareTo(BigInteger.ZERO); + if (signum == 0) { + update(0); + } else if (signum < 0) { + unscaledValue.update(bigInt.negate()); + } else { + unscaledValue.update(bigInt); + } + } + + /** * Updates the value of this object with the given string. * * @param str @@ -1384,7 +1403,7 @@ public int compareTo(Decimal128 val) { * @return {@code true} if and only if the specified {@code Object} is a * {@code Decimal128} whose value and scale are equal to this * {@code Decimal128}'s. - * @see #compareTo(java.math.Decimal128) + * @see #compareTo(Decimal128) * @see #hashCode */ @Override diff --git common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java index d71ebb3..fb3c346 100644 --- common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java +++ common/src/java/org/apache/hadoop/hive/common/type/UnsignedInt128.java @@ -18,7 +18,6 @@ import java.math.BigInteger; import java.nio.IntBuffer; import java.util.Arrays; -import org.apache.hadoop.hive.common.type.SqlMathUtil; /** * This code was originally written for Microsoft PolyBase. @@ -117,11 +116,7 @@ public UnsignedInt128(UnsignedInt128 o) { * v3 */ public UnsignedInt128(int v0, int v1, int v2, int v3) { - this.v[0] = v0; - this.v[1] = v1; - this.v[2] = v2; - this.v[3] = v3; - updateCount(); + update(v0, v1, v2, v3); } /** @@ -158,6 +153,32 @@ public UnsignedInt128(char[] str, int offset, int length) { update(str, offset, length); } + /** + * Constructs from the given BigInteger + * + * @param bigInt + * java BigInteger + */ + public UnsignedInt128(BigInteger bigInt) { + update(bigInt); + } + + /** + * Updates the value of this object from the given {@link BigInteger}. + * Only positive BigIntegers are expected and behavior is undefined for + * negative BigIntegers. + * + * @param bigInt + * java BigInteger + */ + public void update(BigInteger bigInt) { + int v0 = bigInt.intValue(); + int v1 = bigInt.shiftRight(32).intValue(); + int v2 = bigInt.shiftRight(64).intValue(); + int v3 = bigInt.shiftRight(96).intValue(); + update(v0, v1, v2, v3); + } + /** @return v[0] */ public int getV0() { return v[0]; diff --git common/src/test/org/apache/hadoop/hive/common/type/TestUnsignedInt128.java common/src/test/org/apache/hadoop/hive/common/type/TestUnsignedInt128.java index fbb2aa0..9ac68fe 100644 --- common/src/test/org/apache/hadoop/hive/common/type/TestUnsignedInt128.java +++ common/src/test/org/apache/hadoop/hive/common/type/TestUnsignedInt128.java @@ -547,4 +547,13 @@ public void testDivideDestructiveUnsignedInt128Again() { BigInteger ans = bigInteger1.divide(bigInteger2); assertEquals(ans, complicated1.toBigIntegerSlow()); } + + @Test + public void testBigIntConversion() { + BigInteger bigInteger = BigInteger.valueOf(0x1ABCDEF0123456L); + UnsignedInt128 uInt128 = new UnsignedInt128(bigInteger); + System.out.println("Out = "+uInt128.toString()); + System.out.println("Out = "+bigInteger.toString()); + assertEquals(bigInteger, uInt128.toBigIntegerSlow()); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java index 2d66b86..d0d8597 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.ql.exec.vector; import org.apache.hadoop.hive.common.type.Decimal128; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; public class DecimalColumnVector extends ColumnVector { @@ -37,11 +40,17 @@ public short scale; public short precision; + private final HiveDecimalWritable writableObj = new HiveDecimalWritable(); + public DecimalColumnVector(int precision, int scale) { - super(VectorizedRowBatch.DEFAULT_SIZE); + this(VectorizedRowBatch.DEFAULT_SIZE, precision, scale); + } + + public DecimalColumnVector(int size, int precision, int scale) { + super(size); this.precision = (short) precision; this.scale = (short) scale; - final int len = VectorizedRowBatch.DEFAULT_SIZE; + final int len = size; vector = new Decimal128[len]; for (int i = 0; i < len; i++) { vector[i] = new Decimal128(0, this.scale); @@ -50,8 +59,16 @@ public DecimalColumnVector(int precision, int scale) { @Override public Writable getWritableObject(int index) { - // TODO Auto-generated method stub - return null; + if (isRepeating) { + index = 0; + } + if (!noNulls && isNull[index]) { + return NullWritable.get(); + } else { + Decimal128 dec = vector[index]; + writableObj.set(HiveDecimal.create(dec.toBigDecimal())); + return writableObj; + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 0df82b9..e7e5e6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -19,6 +19,7 @@ import java.io.EOFException; import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Timestamp; @@ -34,11 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -1043,6 +1040,7 @@ void skipRows(long items) throws IOException { private static class DecimalTreeReader extends TreeReader{ private InStream valueStream; private IntegerReader scaleStream = null; + private LongColumnVector scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); private final int precision; private final int scale; @@ -1093,8 +1091,50 @@ Object next(Object previous) throws IOException { @Override Object nextVector(Object previousVector, long batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for Decimal type"); + DecimalColumnVector result = null; + if (previousVector == null) { + result = new DecimalColumnVector(precision, scale); + } else { + result = (DecimalColumnVector) previousVector; + } + + // Save the reference for isNull in the scratch vector + boolean [] scratchIsNull = scratchScaleVector.isNull; + + // Read present/isNull stream + super.nextVector(result, batchSize); + + // Read value entries based on isNull entries + if (result.isRepeating) { + if (!result.isNull[0]) { + BigInteger bInt = SerializationUtils.readBigInteger(valueStream); + short scaleInData = (short) scaleStream.next(); + result.vector[0].update(bInt, scaleInData); + + // Change the scale to match the schema if the scale in data is different. + if (scale != scaleInData) { + result.vector[0].changeScaleDestructive((short) scale); + } + } + } else { + // result vector has isNull values set, use the same to read scale vector. + scratchScaleVector.isNull = result.isNull; + scaleStream.nextVector(scratchScaleVector, batchSize); + for (int i = 0; i < batchSize; i++) { + if (!result.isNull[i]) { + BigInteger bInt = SerializationUtils.readBigInteger(valueStream); + result.vector[i].update(bInt, (short) scratchScaleVector.vector[i]); + + // Change the scale to match the schema if the scale in data is different. + if (scale != scratchScaleVector.vector[i]) { + result.vector[i].changeScaleDestructive((short) scale); + } + } + } + } + // Switch back the null vector. + scratchScaleVector.isNull = scratchIsNull; + return result; } @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java index 0d5b7ff..23d89df 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java @@ -22,8 +22,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.Decimal128; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.BooleanWritable; @@ -71,9 +74,10 @@ public void openFileSystem() throws Exception { private final String k; private final Timestamp t; private final Date dt; + private final HiveDecimal hd; MyRecord(Boolean bo, Byte by, Integer i, Long l, Short s, Double d, String k, Timestamp t, - Date dt) { + Date dt, HiveDecimal hd) { this.bo = bo; this.by = by; this.i = i; @@ -83,6 +87,7 @@ public void openFileSystem() throws Exception { this.k = k; this.t = t; this.dt = dt; + this.hd = hd; } } @@ -109,13 +114,15 @@ public void createFile() throws Exception { "Heaven,", "we", "were", "all", "going", "direct", "the", "other", "way"}; String[] dates = new String[] {"1991-02-28", "1970-01-31", "1950-04-23"}; + String[] decimalStrings = new String[] {"234.443", "10001000", "0.3333367", "67788798.0", "-234.443", + "-10001000", "-0.3333367", "-67788798.0", "0"}; for (int i = 0; i < 21000; ++i) { if ((i % 7) != 0) { writer.addRow(new MyRecord(((i % 3) == 0), (byte)(i % 5), i, (long) 200, (short) (300 + i), (double) (400 + i), words[r1.nextInt(words.length)], new Timestamp(Calendar.getInstance().getTime().getTime()), - Date.valueOf(dates[i % 3]))); + Date.valueOf(dates[i % 3]), HiveDecimal.create(decimalStrings[i % decimalStrings.length]))); } else { - writer.addRow(new MyRecord(null, null, i, (long) 200, null, null, null, null, null)); + writer.addRow(new MyRecord(null, null, i, (long) 200, null, null, null, null, null, null)); } } writer.close(); @@ -162,6 +169,13 @@ private void checkVectorizedReader() throws Exception { Assert.assertEquals(adt.getTime(), DateWritable.daysToMillis((int) ((LongWritable) b).get())); continue; } + + // Decimals are stored as BigInteger, so convert and compare + if (a instanceof HiveDecimal) { + HiveDecimalWritable dec = (HiveDecimalWritable) b; + Assert.assertEquals(a, dec.getHiveDecimal()); + } + if (null == a) { Assert.assertEquals(true, (b == null || (b instanceof NullWritable))); } else { @@ -180,6 +194,7 @@ private void checkVectorizedReader() throws Exception { Assert.assertEquals(false, batch.cols[6].isRepeating); Assert.assertEquals(false, batch.cols[7].isRepeating); Assert.assertEquals(false, batch.cols[8].isRepeating); + Assert.assertEquals(false, batch.cols[9].isRepeating); // Check non null Assert.assertEquals(false, batch.cols[0].noNulls); @@ -191,6 +206,7 @@ private void checkVectorizedReader() throws Exception { Assert.assertEquals(false, batch.cols[6].noNulls); Assert.assertEquals(false, batch.cols[7].noNulls); Assert.assertEquals(false, batch.cols[8].noNulls); + Assert.assertEquals(false, batch.cols[9].noNulls); } Assert.assertEquals(false, rr.hasNext()); }