diff --git itests/hive-jmh/src/main/java/org/apache/hive/benchmark/serde/LazySimpleSerDeBench.java itests/hive-jmh/src/main/java/org/apache/hive/benchmark/serde/LazySimpleSerDeBench.java index a1b63d5..826bf53 100644 --- itests/hive-jmh/src/main/java/org/apache/hive/benchmark/serde/LazySimpleSerDeBench.java +++ itests/hive-jmh/src/main/java/org/apache/hive/benchmark/serde/LazySimpleSerDeBench.java @@ -15,16 +15,19 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Date; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyByte; +import org.apache.hadoop.hive.serde2.lazy.LazyDate; import org.apache.hadoop.hive.serde2.lazy.LazyDouble; import org.apache.hadoop.hive.serde2.lazy.LazyFloat; import org.apache.hadoop.hive.serde2.lazy.LazyInteger; import org.apache.hadoop.hive.serde2.lazy.LazyLong; import org.apache.hadoop.hive.serde2.lazy.LazyShort; +import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyPrimitiveObjectInspectorFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -53,15 +56,14 @@ * $ java -cp target/benchmarks.jar org.apache.hive.benchmark.serde.LazySimpleSerDeBench *

*/ + public static final int DEFAULT_ITER_TIME = 1000000; + public static final int DEFAULT_DATA_SIZE = 4096; @BenchmarkMode(Mode.AverageTime) @Fork(1) @State(Scope.Thread) @OutputTimeUnit(TimeUnit.NANOSECONDS) public static abstract class AbstractDeserializer { - public static final int DEFAULT_ITER_TIME = 1000000; - - public static final int DEFAULT_DATA_SIZE = 4096; public int[] offsets = new int[DEFAULT_DATA_SIZE]; public int[] sizes = new int[DEFAULT_DATA_SIZE]; @@ -445,6 +447,171 @@ public void bench() { } } + @BenchmarkMode(Mode.AverageTime) + @Fork(1) + @State(Scope.Thread) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static class GoodLazyDate { + + final LazyDate obj = new LazyDate( + LazyPrimitiveObjectInspectorFactory.LAZY_DATE_OBJECT_INSPECTOR); + + public int[] offsets = new int[DEFAULT_DATA_SIZE]; + public int[] sizes = new int[DEFAULT_DATA_SIZE]; + protected final ByteArrayRef ref = new ByteArrayRef(); + + @Setup + public void setup() { + sizes = new int[DEFAULT_DATA_SIZE]; + offsets = new int[sizes.length]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Random r = new Random(); + int len = 0; + final long base = -320000000L*1000L; // 1959 + for (int i = 0; i < DEFAULT_DATA_SIZE; i++) { + // -ve dates are also valid dates - the dates are within 1959 to 2027 + Date dt = new Date(base + (Math.abs(r.nextLong()) % (Integer.MAX_VALUE*1000L))); + byte[] ds = dt.toString().getBytes(); + sizes[i] = ds.length; + offsets[i] = len; + len += ds.length; + try { + bos.write(ds); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + ref.setData(bos.toByteArray()); + } + + @Benchmark + @Warmup(iterations = 2, time = 2, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.MILLISECONDS) + public void bench() { + for (int i = 0; i < DEFAULT_ITER_TIME; i++) { + obj.init(ref, offsets[i % sizes.length], sizes[i % sizes.length]); + } + } + } + + public static class RandomLazyDate extends RandomDataInitializer { + + final LazyDate obj = new LazyDate( + LazyPrimitiveObjectInspectorFactory.LAZY_DATE_OBJECT_INSPECTOR); + + public RandomLazyDate() { + super(4); + } + + + @Override + public void bench() { + for (int i = 0; i < DEFAULT_ITER_TIME; i++) { + obj.init(ref, offsets[i % sizes.length], sizes[i % sizes.length]); + } + } + } + + public static class WorstLazyDate extends RandomDataInitializer { + + final LazyDate obj = new LazyDate( + LazyPrimitiveObjectInspectorFactory.LAZY_DATE_OBJECT_INSPECTOR); + + public WorstLazyDate() { + super(8); + } + + @Override + public void bench() { + for (int i = 0; i < DEFAULT_ITER_TIME; i++) { + obj.init(ref, offsets[i % sizes.length], sizes[i % sizes.length]); + } + } + } + + @BenchmarkMode(Mode.AverageTime) + @Fork(1) + @State(Scope.Thread) + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public static class GoodLazyTimestamp { + + final LazyTimestamp obj = new LazyTimestamp( + LazyPrimitiveObjectInspectorFactory.LAZY_TIMESTAMP_OBJECT_INSPECTOR); + + public int[] offsets = new int[DEFAULT_DATA_SIZE]; + public int[] sizes = new int[DEFAULT_DATA_SIZE]; + protected final ByteArrayRef ref = new ByteArrayRef(); + + @Setup + public void setup() { + sizes = new int[DEFAULT_DATA_SIZE]; + offsets = new int[sizes.length]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Random r = new Random(); + int len = 0; + final long base = -320000000L * 1000L; // 1959 + for (int i = 0; i < DEFAULT_DATA_SIZE; i++) { + // -ve dates are also valid Timestamps - dates are within 1959 to 2027 + Date dt = new Date(base + (Math.abs(r.nextLong()) % (Integer.MAX_VALUE * 1000L))); + byte[] ds = String.format("%s 00:00:01", dt.toString()).getBytes(); + sizes[i] = ds.length; + offsets[i] = len; + len += ds.length; + try { + bos.write(ds); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + ref.setData(bos.toByteArray()); + } + + @Benchmark + @Warmup(iterations = 2, time = 2, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.MILLISECONDS) + public void bench() { + for (int i = 0; i < DEFAULT_ITER_TIME; i++) { + obj.init(ref, offsets[i % sizes.length], sizes[i % sizes.length]); + } + } + } + + public static class RandomLazyTimestamp extends RandomDataInitializer { + + final LazyTimestamp obj = new LazyTimestamp( + LazyPrimitiveObjectInspectorFactory.LAZY_TIMESTAMP_OBJECT_INSPECTOR); + + public RandomLazyTimestamp() { + super(4); + } + + @Override + public void bench() { + for (int i = 0; i < DEFAULT_ITER_TIME; i++) { + obj.init(ref, offsets[i % sizes.length], sizes[i % sizes.length]); + } + } + } + + public static class WorstLazyTimestamp extends RandomDataInitializer { + + final LazyTimestamp obj = new LazyTimestamp( + LazyPrimitiveObjectInspectorFactory.LAZY_TIMESTAMP_OBJECT_INSPECTOR); + + public WorstLazyTimestamp() { + super(8); + } + + @Override + public void bench() { + for (int i = 0; i < DEFAULT_ITER_TIME; i++) { + obj.init(ref, offsets[i % sizes.length], sizes[i % sizes.length]); + } + } + } + public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder().include( ".*" + LazySimpleSerDeBench.class.getSimpleName() + ".*").build(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java index 894ef59..110a5e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java @@ -20,6 +20,7 @@ import java.sql.Date; import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; import org.slf4j.Logger; @@ -117,6 +118,7 @@ private void allocateArrays(int count) { isConvert = new boolean[count]; projectionColumnNums = new int[count]; + Arrays.fill(projectionColumnNums, -1); targetCategories = new Category[count]; targetPrimitiveCategories = new PrimitiveCategory[count]; maxLengths = new int[count]; @@ -337,7 +339,9 @@ public void assignRowColumn(VectorizedRowBatch batch, int batchIndex, int logica Object object) { final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; if (object == null) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + if (projectionColumnNum != -1) { + VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + } return; } Category targetCategory = targetCategories[logicalColumnIndex]; @@ -495,7 +499,9 @@ public void assignConvertRowColumn(VectorizedRowBatch batch, int batchIndex, Preconditions.checkState(isConvert[logicalColumnIndex]); final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; if (object == null) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + if (projectionColumnNum != -1) { + VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + } return; } try { @@ -720,6 +726,11 @@ public void assignRow(VectorizedRowBatch batch, int batchIndex, List standardObjects, int columnCount) { for (int i = 0; i < columnCount; i++) { + Object col = null; + if (projectionColumnNums[i] != -1) { + // this is too little, too late because the Row has already been deserialized + col = standardObjects.get(i); + } if (isConvert[i]) { assignConvertRowColumn(batch, batchIndex, i, standardObjects.get(i)); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index 2e8331a..e92d7b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -20,6 +20,7 @@ import java.io.EOFException; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.slf4j.Logger; @@ -125,6 +126,7 @@ private VectorDeserializeRow() { private void allocateArrays(int count) { isConvert = new boolean[count]; projectionColumnNums = new int[count]; + Arrays.fill(projectionColumnNums, -1); sourceCategories = new Category[count]; sourcePrimitiveCategories = new PrimitiveCategory[count]; maxLengths = new int[count]; @@ -344,7 +346,10 @@ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex) throws IOException { final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; if (deserializeRead.readCheckNull()) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + if (projectionColumnNum != -1) { + // important: readCheckNull skips the actual data column + VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + } return; } @@ -482,7 +487,10 @@ private void deserializeConvertRowColumn(VectorizedRowBatch batch, int batchInde int logicalColumnIndex) throws IOException { final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; if (deserializeRead.readCheckNull()) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + if (projectionColumnNum != -1) { + // important: readCheckNull skips the actual data column + VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + } return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 6979956..0cab25e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -45,7 +44,6 @@ import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -56,9 +54,9 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -400,9 +398,13 @@ private void determineColumnsToInclude(Configuration hconf) { columnsToIncludeTruncated = null; - List columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(hconf); - if (columnsToIncludeTruncatedList != null && - columnsToIncludeTruncatedList.size() > 0 && columnsToIncludeTruncatedList.size() < dataColumnCount ) { + List columnsToIncludeTruncatedList = null; + + if (oneRootOperator instanceof TableScanOperator) { + columnsToIncludeTruncatedList = ((TableScanOperator) oneRootOperator).getNeededColumnIDs(); + } + + if (columnsToIncludeTruncatedList != null) { // Partitioned columns will not be in the include list. @@ -421,9 +423,6 @@ private void determineColumnsToInclude(Configuration hconf) { break; } } - if (highestWantedColumnNum == -1) { - throw new RuntimeException("No columns to include?"); - } int newColumnCount = highestWantedColumnNum + 1; if (newColumnCount == dataColumnCount) { columnsToIncludeTruncated = columnsToInclude; @@ -431,7 +430,7 @@ private void determineColumnsToInclude(Configuration hconf) { columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount); } } - } + } /** Kryo ctor. */ public VectorMapOperator() { @@ -479,9 +478,11 @@ private void internalSetChildren(Configuration hconf) throws Exception { // so set it here to none. currentReadType = VectorMapOperatorReadType.NONE; - determineColumnsToInclude(hconf); - batchContext = conf.getVectorizedRowBatchCtx(); + dataColumnCount = batchContext.getDataColumnCount(); + partitionColumnCount = batchContext.getPartitionColumnCount(); + partitionValues = new Object[partitionColumnCount]; + determineColumnsToInclude(hconf); /* * Use a different batch for vectorized Input File Format readers so they can do their work @@ -499,10 +500,6 @@ private void internalSetChildren(Configuration hconf) throws Exception { batchCounter = 0; - dataColumnCount = batchContext.getDataColumnCount(); - partitionColumnCount = batchContext.getPartitionColumnCount(); - partitionValues = new Object[partitionColumnCount]; - /* * Create table related objects */ diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyDate.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyDate.java index 0579ff2..c00faac 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyDate.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyDate.java @@ -59,6 +59,10 @@ public LazyDate(LazyDate copy) { @Override public void init(ByteArrayRef bytes, int start, int length) { String s = null; + if (!LazyUtils.isDateMaybe(bytes.getData(), start, length)) { + isNull = true; + return; + } try { s = Text.decode(bytes.getData(), start, length); data.set(Date.valueOf(s)); diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyTimestamp.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyTimestamp.java index 8f0c3d2..56945d1 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyTimestamp.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyTimestamp.java @@ -59,6 +59,10 @@ public LazyTimestamp(LazyTimestamp copy) { @Override public void init(ByteArrayRef bytes, int start, int length) { String s = null; + if (!LazyUtils.isDateMaybe(bytes.getData(), start, length)) { + isNull = true; + return; + } try { s = new String(bytes.getData(), start, length, "US-ASCII"); } catch (UnsupportedEncodingException e) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java index 6d7369b..73c72e1 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java @@ -109,6 +109,18 @@ public static boolean isNumberMaybe(byte[] buf, int offset, int len) { } /** + * returns false, when the bytes definitely cannot be parsed into a date/timestamp. + * + * Y2k requirements and dash requirements say the string has to be at least + * yyyy-m-m = 8 bytes or more minimum; Timestamp needs to be at least 1 byte longer, + * but the Date check is necessary, but not sufficient. + */ + public static boolean isDateMaybe(byte[] buf, int offset, int len) { + // maybe valid - too expensive to check without a parse + return len >= 8; + } + + /** * Returns -1 if the first byte sequence is lexicographically less than the * second; returns +1 if the second byte sequence is lexicographically less * than the first; otherwise return 0. diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java index 7e9f94e..52ff89f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java @@ -20,7 +20,10 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.CharacterCodingException; +import java.nio.charset.StandardCharsets; import java.sql.Date; +import java.util.Arrays; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -34,6 +37,7 @@ import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; import org.apache.hadoop.hive.serde2.lazy.LazyShort; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Text; @@ -57,29 +61,28 @@ public final class LazySimpleDeserializeRead extends DeserializeRead { public static final Logger LOG = LoggerFactory.getLogger(LazySimpleDeserializeRead.class.getName()); - private int[] startPosition; + private final int[] startPosition; - private byte separator; - private boolean isEscaped; - private byte escapeChar; - private byte[] nullSequenceBytes; - private boolean isExtendedBooleanLiteral; - private boolean lastColumnTakesRest; + private final byte separator; + private final boolean isEscaped; + private final byte escapeChar; + private final byte[] nullSequenceBytes; + private final boolean isExtendedBooleanLiteral; + private final boolean lastColumnTakesRest; private byte[] bytes; private int start; - private int offset; private int end; private int fieldCount; private int fieldIndex; - private int fieldStart; - private int fieldLength; + private boolean hasEscapes; + private int minFieldCount; - private Text tempText; - private TimestampParser timestampParser; + private final Text tempText; + private final TimestampParser timestampParser; - private boolean extraFieldWarned; - private boolean missingFieldWarned; + private boolean extraFieldWarned = false; + private boolean missingFieldWarned = false; public LazySimpleDeserializeRead(TypeInfo[] typeInfos, byte separator, LazySerDeParameters lazyParams) { @@ -97,9 +100,15 @@ public LazySimpleDeserializeRead(TypeInfo[] typeInfos, lastColumnTakesRest = lazyParams.isLastColumnTakesRest(); fieldCount = typeInfos.length; - tempText = new Text(); - extraFieldWarned = false; - missingFieldWarned = false; + if (isEscaped) { + tempText = new Text(); + } else { + tempText = null; + } + + timestampParser = new TimestampParser(); + // setup min fields + setColumnsToInclude(columnsToInclude); } public LazySimpleDeserializeRead(TypeInfo[] typeInfos, LazySerDeParameters lazyParams) { @@ -107,8 +116,20 @@ public LazySimpleDeserializeRead(TypeInfo[] typeInfos, LazySerDeParameters lazyP } // Not public since we must have the field count so every 8 fields NULL bytes can be navigated. + @SuppressWarnings("unused") private LazySimpleDeserializeRead() { super(); + minFieldCount = fieldCount = -1; + separator = 0; + tempText = null; + timestampParser = null; + + isEscaped = false; + escapeChar = '\0'; + nullSequenceBytes = null; + isExtendedBooleanLiteral = false; + lastColumnTakesRest = false; + startPosition = null; } /* @@ -117,10 +138,24 @@ private LazySimpleDeserializeRead() { @Override public void set(byte[] bytes, int offset, int length) { this.bytes = bytes; - this.offset = offset; - start = offset; - end = offset + length; - fieldIndex = -1; + this.start = offset; + this.end = offset + length; + this.fieldIndex = -1; + } + + @Override + public void setColumnsToInclude(boolean[] columnsToInclude) { + super.setColumnsToInclude(columnsToInclude); + int lastUsedField = fieldCount - 1; // last index + if (columnsToInclude != null) { + lastUsedField = -1; // all items are false in columns + for (int i = 0; i < columnsToInclude.length; i++) { + if (columnsToInclude[i]) { + lastUsedField = i; + } + } + } + minFieldCount = Math.min(fieldCount, lastUsedField + 1); } /** @@ -131,56 +166,80 @@ public void set(byte[] bytes, int offset, int length) { */ private void parse() { - int structByteEnd = end; + final int structByteEnd = end; + final byte delimiter = separator; + final int fieldCount = this.fieldCount; + final int minFieldCount = this.minFieldCount; int fieldId = 0; int fieldByteBegin = start; int fieldByteEnd = start; + final int[] pos = startPosition; // Go through all bytes in the byte[] while (fieldByteEnd <= structByteEnd) { - if (fieldByteEnd == structByteEnd || bytes[fieldByteEnd] == separator) { + final byte b; // note: final to prove assignment escape test + if (fieldByteEnd == structByteEnd + || (b = bytes[fieldByteEnd]) == delimiter) { // assignment + check // Reached the end of a field? if (lastColumnTakesRest && fieldId == fieldCount - 1) { fieldByteEnd = structByteEnd; } - startPosition[fieldId] = fieldByteBegin; + pos[fieldId] = fieldByteBegin; fieldId++; - if (fieldId == fieldCount || fieldByteEnd == structByteEnd) { + if (fieldId == minFieldCount || fieldByteEnd == structByteEnd) { // All fields have been parsed, or bytes have been parsed. // We need to set the startPosition of fields.length to ensure we // can use the same formula to calculate the length of each field. // For missing fields, their starting positions will all be the same, // which will make their lengths to be -1 and uncheckedGetField will // return these fields as NULLs. - for (int i = fieldId; i <= fieldCount; i++) { - startPosition[i] = fieldByteEnd + 1; - } + Arrays.fill(pos, fieldId, pos.length, fieldByteEnd + 1); break; } fieldByteBegin = fieldByteEnd + 1; - fieldByteEnd++; - } else { - if (isEscaped && bytes[fieldByteEnd] == escapeChar - && fieldByteEnd + 1 < structByteEnd) { - // ignore the char after escape_char - fieldByteEnd += 2; - } else { - fieldByteEnd++; - } + } else if (b == escapeChar && isEscaped + && fieldByteEnd + 1 < structByteEnd) { + // ignore the char after escape_char + fieldByteEnd += 1; } + fieldByteEnd++; } - // Extra bytes at the end? - if (!extraFieldWarned && fieldByteEnd < structByteEnd) { + // Extra bytes at the end? and all fields were supposed to be parsed + if (fieldByteEnd < structByteEnd + && minFieldCount == fieldCount + && !extraFieldWarned) { doExtraFieldWarned(); } // Missing fields? - if (!missingFieldWarned && fieldId < fieldCount) { + if (fieldId < minFieldCount && !missingFieldWarned) { doMissingFieldWarned(fieldId); } } + private boolean checkNull(byte[] bytes, int start, int len) { + if (len != nullSequenceBytes.length) { + return false; + } + switch(len) { + case 0: + return true; + case 2: + return bytes[start] == nullSequenceBytes[0] && bytes[start+1] == nullSequenceBytes[1]; + case 4: + return bytes[start] == nullSequenceBytes[0] && bytes[start+1] == nullSequenceBytes[1] + && bytes[start+2] == nullSequenceBytes[2] && bytes[start+3] == nullSequenceBytes[3]; + default: + for (int i = 0; i < nullSequenceBytes.length; i++) { + if (bytes[start + i] != nullSequenceBytes[i]) { + return false; + } + } + return true; + } + } + /* * Reads the NULL information for a field. * @@ -189,10 +248,15 @@ private void parse() { */ @Override public boolean readCheckNull() { + + if (minFieldCount == 0) { + return true; + } + if (fieldIndex == -1) { parse(); fieldIndex = 0; - } else if (fieldIndex + 1 >= fieldCount) { + } else if (fieldIndex + 1 >= minFieldCount) { return true; } else { fieldIndex++; @@ -203,54 +267,48 @@ public boolean readCheckNull() { return true; } - fieldStart = startPosition[fieldIndex]; - fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1; + final int fieldStart = startPosition[fieldIndex]; + final int fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1; if (fieldLength < 0) { return true; } // Is the field the configured string representing NULL? if (nullSequenceBytes != null) { - if (fieldLength == nullSequenceBytes.length) { - int i = 0; - while (true) { - if (bytes[fieldStart + i] != nullSequenceBytes[i]) { - break; - } - i++; - if (i >= fieldLength) { - return true; - } - } + if (checkNull(bytes, fieldStart, fieldLength)) { + return true; } } + return checkNullInternal(fieldStart, fieldLength); + } - /* - * We have a field and are positioned to it. Read it. - */ - switch (primitiveCategories[fieldIndex]) { - case BOOLEAN: - { + private boolean checkNullInternal(final int fieldStart, final int fieldLength) { + try { + /* + * We have a field and are positioned to it. Read it. + */ + switch (primitiveCategories[fieldIndex]) { + case BOOLEAN: { int i = fieldStart; if (fieldLength == 4) { - if ((bytes[i] == 'T' || bytes[i] == 't') && - (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') && - (bytes[i + 2] == 'U' || bytes[i + 1] == 'u') && - (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) { + if ((bytes[i] == 'T' || bytes[i] == 't') + && (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') + && (bytes[i + 2] == 'U' || bytes[i + 1] == 'u') + && (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) { currentBoolean = true; } else { - // No boolean value match for 5 char field. + // No boolean value match for 4 char field. return true; } } else if (fieldLength == 5) { - if ((bytes[i] == 'F' || bytes[i] == 'f') && - (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') && - (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') && - (bytes[i + 3] == 'S' || bytes[i + 3] == 's') && - (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) { + if ((bytes[i] == 'F' || bytes[i] == 'f') + && (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') + && (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') + && (bytes[i + 3] == 'S' || bytes[i + 3] == 's') + && (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) { currentBoolean = false; } else { - // No boolean value match for 4 char field. + // No boolean value match for 5 char field. return true; } } else if (isExtendedBooleanLiteral && fieldLength == 1) { @@ -269,106 +327,68 @@ public boolean readCheckNull() { } } break; - case BYTE: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - try { + case BYTE: { + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return true; + } currentByte = LazyByte.parseByte(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "TINYINT"); - return true; } break; - case SHORT: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - try { + case SHORT: { + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return true; + } currentShort = LazyShort.parseShort(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "SMALLINT"); - return true; } break; - case INT: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - try { + case INT: { + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return true; + } currentInt = LazyInteger.parseInt(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "INT"); - return true; } break; - case LONG: - if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { - return true; - } - try { + case LONG: { + if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { + return true; + } currentLong = LazyLong.parseLong(bytes, fieldStart, fieldLength, 10); - } catch (NumberFormatException e) { - logExceptionMessage(bytes, fieldStart, fieldLength, "BIGINT"); - return true; } break; - case FLOAT: - { + case FLOAT: { if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; } - String byteData = null; - try { - byteData = Text.decode(bytes, fieldStart, fieldLength); - currentFloat = Float.parseFloat(byteData); - } catch (NumberFormatException e) { - LOG.debug("Data not in the Float data type range so converted to null. Given data is :" - + byteData, e); - return true; - } catch (CharacterCodingException e) { - LOG.debug("Data not in the Float data type range so converted to null.", e); - return true; - } + String byteData = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); + currentFloat = Float.parseFloat(byteData); } break; - case DOUBLE: - { + case DOUBLE: { if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; } - String byteData = null; - try { - byteData = Text.decode(bytes, fieldStart, fieldLength); - currentDouble = Double.parseDouble(byteData); - } catch (NumberFormatException e) { - LOG.debug("Data not in the Double data type range so converted to null. Given data is :" - + byteData, e); - return true; - } catch (CharacterCodingException e) { - LOG.debug("Data not in the Double data type range so converted to null.", e); - return true; - } + String byteData = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); + currentDouble = Double.parseDouble(byteData); } break; - - case STRING: - case CHAR: - case VARCHAR: - if (isEscaped) { - LazyUtils.copyAndEscapeStringDataToText(bytes, fieldStart, fieldLength, escapeChar, tempText); - currentBytes = tempText.getBytes(); - currentBytesStart = 0; - currentBytesLength = tempText.getLength(); - } else { - // if the data is not escaped, simply copy the data. - currentBytes = bytes; - currentBytesStart = fieldStart; - currentBytesLength = fieldLength; + case STRING: + case CHAR: + case VARCHAR: { + if (isEscaped && hasEscapes) { + LazyUtils.copyAndEscapeStringDataToText(bytes, fieldStart, fieldLength, escapeChar, + tempText); + currentBytes = tempText.getBytes(); + currentBytesStart = 0; + currentBytesLength = tempText.getLength(); + } else { + // if the data is not escaped, simply copy the data. + currentBytes = bytes; + currentBytesStart = fieldStart; + currentBytesLength = fieldLength; + } } break; - case BINARY: - { + case BINARY: { byte[] recv = new byte[fieldLength]; System.arraycopy(bytes, fieldStart, recv, 0, fieldLength); byte[] decoded = LazyBinary.decodeIfNeeded(recv); @@ -379,14 +399,12 @@ public boolean readCheckNull() { currentBytesLength = decoded.length; } break; - case DATE: - { - if (fieldLength == 0) { + case DATE: { + if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) { return true; } - String s = null; try { - s = Text.decode(bytes, fieldStart, fieldLength); + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); currentDateWritable.set(Date.valueOf(s)); } catch (Exception e) { logExceptionMessage(bytes, fieldStart, fieldLength, "DATE"); @@ -394,27 +412,17 @@ public boolean readCheckNull() { } } break; - case TIMESTAMP: - { - if (fieldLength == 0) { + case TIMESTAMP: { + if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) { return true; } - String s = null; - try { - s = new String(bytes, fieldStart, fieldLength, "US-ASCII"); - } catch (UnsupportedEncodingException e) { - LOG.error("Unsupported encoding found ", e); - s = ""; - } + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.US_ASCII); if (s.compareTo("NULL") == 0) { logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP"); return true; } else { try { - if (timestampParser == null) { - timestampParser = new TimestampParser(); - } currentTimestampWritable.set(timestampParser.parseTimestamp(s)); } catch (IllegalArgumentException e) { logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP"); @@ -423,23 +431,25 @@ public boolean readCheckNull() { } } break; - case INTERVAL_YEAR_MONTH: - { - String s = null; + case INTERVAL_YEAR_MONTH: { + if (fieldLength == 0) { + return true; + } try { - s = Text.decode(bytes, fieldStart, fieldLength); + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); currentHiveIntervalYearMonthWritable.set(HiveIntervalYearMonth.valueOf(s)); } catch (Exception e) { logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_YEAR_MONTH"); return true; } } - break; - case INTERVAL_DAY_TIME: - { - String s = null; + break; + case INTERVAL_DAY_TIME: { + if (fieldLength == 0) { + return true; + } try { - s = Text.decode(bytes, fieldStart, fieldLength); + String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); currentHiveIntervalDayTimeWritable.set(HiveIntervalDayTime.valueOf(s)); } catch (Exception e) { logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_DAY_TIME"); @@ -447,25 +457,16 @@ public boolean readCheckNull() { } } break; - case DECIMAL: - { + case DECIMAL: { if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) { return true; } - String byteData = null; - try { - byteData = Text.decode(bytes, fieldStart, fieldLength); - } catch (CharacterCodingException e) { - LOG.debug("Data not in the HiveDecimal data type range so converted to null.", e); - return true; - } - + String byteData = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8); HiveDecimal decimal = HiveDecimal.create(byteData); DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex]; int precision = decimalTypeInfo.getPrecision(); int scale = decimalTypeInfo.getScale(); - decimal = HiveDecimal.enforcePrecisionScale( - decimal, precision, scale); + decimal = HiveDecimal.enforcePrecisionScale(decimal, precision, scale); if (decimal == null) { LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :" + byteData); @@ -474,14 +475,38 @@ public boolean readCheckNull() { currentHiveDecimalWritable.set(decimal); } break; - - default: - throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name()); + default: + throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name()); + } + } catch (NumberFormatException nfe) { + // U+FFFD will throw this as well + logExceptionMessage(bytes, fieldStart, fieldLength, primitiveCategories[fieldIndex]); + return true; } return false; } + public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, + PrimitiveCategory dataCategory) { + final String dataType; + switch (dataCategory) { + case BYTE: + dataType = "TINYINT"; + break; + case LONG: + dataType = "BIGINT"; + break; + case SHORT: + dataType = "SMALLINT"; + break; + default: + dataType = dataCategory.toString(); + break; + } + logExceptionMessage(bytes, bytesStart, bytesLength, dataType); + } + public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, String dataType) { try { if(LOG.isDebugEnabled()) {