diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index f513188..676e6c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -45,7 +45,7 @@ import org.apache.hadoop.mapred.FileSplit; /** - * Context for Vectorized row batch. this calss does eager deserialization of row data using serde + * Context for Vectorized row batch. this class does eager deserialization of row data using serde * in the RecordReader layer. * It has supports partitions in this layer so that the vectorized batch is populated correctly * with the partition column. @@ -96,12 +96,12 @@ public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspect public VectorizedRowBatchCtx() { } - + /** * Initializes the VectorizedRowBatch context based on an arbitrary object inspector - * Used by non-tablescan operators when they change the vectorization context + * Used by non-tablescan operators when they change the vectorization context * @param hiveConf - * @param fileKey + * @param fileKey * The key on which to retrieve the extra column mapping from the map scratch * @param rowOI * Object inspector that shapes the column types @@ -114,7 +114,7 @@ public void init(Configuration hiveConf, String fileKey, this.rowOI= rowOI; this.rawRowOI = rowOI; } - + /** * Initializes VectorizedRowBatch context based on the @@ -154,7 +154,7 @@ public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx part.getTableDesc().getProperties() : part.getProperties(); Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName()); - Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); + Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); partDeserializer.initialize(hiveConf, partProps); StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer .getObjectInspector(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java new file mode 100644 index 0000000..b749154 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.*; +import java.util.ArrayList; + +import org.apache.commons.io.Charsets; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A Vectorized {@link InputFormat} for plain text files in UTF-8 format. + */ +public class VectorizedTextInputFormat extends FileInputFormat + implements InputFormatChecker, VectorizedInputFormatInterface { + + private static class VectorizedTextRecordReader + implements RecordReader { + private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; + private final long offset; + private final long length; + private float progress = 0.0f; + private VectorizedRowBatchCtx rbCtx; + private boolean addPartitionCols = true; + + public VectorizedTextRecordReader(JobConf conf, FileSplit split, + byte[] recordDelimiterBytes) { + + rbCtx = new VectorizedRowBatchCtx(); + try { + rbCtx.init(conf, split); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.offset = split.getStart(); + this.length = split.getLength(); + + reader = new VectorizedTextReaderImpl(conf, split, recordDelimiterBytes, rbCtx); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + + if (!reader.hasNext()) { + return false; + } + try { + // Check and update partition cols if necessary. Ideally, this should be done + // in CreateValue as the partition is constant per split. But since Hive uses + // CombineHiveRecordReader and + // as this does not call CreateValue for each new RecordReader it creates, this check is + // required in next() + if (addPartitionCols) { + rbCtx.addPartitionColsToBatch(value); + addPartitionCols = false; + } + reader.nextBatch(value); + } catch (Exception e) { + throw new RuntimeException(e); + } + progress = reader.getProgress(); + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + VectorizedRowBatch result = null; + try { + result = rbCtx.createVectorizedRowBatch(); + } catch (HiveException e) { + throw new RuntimeException("Error creating a batch", e); + } + return result; + } + + @Override + public long getPos() throws IOException { + return offset + (long) (progress * length); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public float getProgress() throws IOException { + return progress; + } + } + + public VectorizedTextInputFormat() { + } + + private CompressionCodecFactory compressionCodecs = null; + + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + protected boolean isSplitable(FileSystem fs, Path file) { + final CompressionCodec codec = compressionCodecs.getCodec(file); + if (null == codec) { + return true; + } + return codec instanceof SplittableCompressionCodec; + } + + public RecordReader getRecordReader( + InputSplit genericSplit, JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(genericSplit.toString()); + String delimiter = job.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } + + return new VectorizedTextRecordReader(job, (FileSplit) genericSplit, + recordDelimiterBytes); + } + + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, + ArrayList files) throws IOException { + + // TODO Auto-generated method stub + return true; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextReaderImpl.java new file mode 100644 index 0000000..da57ac4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextReaderImpl.java @@ -0,0 +1,326 @@ +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +public class VectorizedTextReaderImpl implements RecordReader { + + private JobConf job; + private FileSplit split; + private byte[] recordDelimiterBytes; + private VectorizedRowBatchCtx rbCtx; + private FSDataInputStream inStream = null; + private long nextReadPosition; + private byte[] buf; + private int bufferPosition; + private TextBatchFrame textBatchFrame; + private int numCols = -1; // number of columns in the batch to read + + private long dummyBatchCount = 0; + + public VectorizedTextReaderImpl(JobConf job, FileSplit split, + byte[] recordDelimiterBytes, VectorizedRowBatchCtx rbCtx) { + this.job = job; + this.split = split; + this.recordDelimiterBytes = recordDelimiterBytes; + this.rbCtx = rbCtx; + if (split.getLength() > (long) Integer.MAX_VALUE) { + throw new RuntimeException("Internal error: split length won't fit in int"); + } + this.buf = new byte[(int) split.getLength()]; + } + + @Override + public boolean hasNext() throws IOException { + // TODO Auto-generated method stub + return dummyBatchCount != 5; + } + + @Override + public Object next(Object previous) throws IOException { + throw new RuntimeException("row-mode not implemented in VectorizedTextInputFormat"); + } + + @Override + public VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) + throws IOException { + if (previousBatch == null) { + try { + previousBatch = rbCtx.createVectorizedRowBatch(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + } + + if (inStream == null) { + + // open file for split and position cursor at start of split + Path filePath = split.getPath(); + FileSystem fs = filePath.getFileSystem(job); + inStream = fs.open(filePath); + nextReadPosition = split.getStart(); + + // read whole file into buffer + inStream.seek(nextReadPosition); + inStream.readFully(split.getLength(), buf); + bufferPosition = 0; // take next batch at start of buffer + } + + // load batch with data from the current position in the split + fillBatch(previousBatch); + + // advance to the next position in split + advanceBufferPosition(); + + return previousBatch; + } + + private void advanceBufferPosition() { + // TODO Auto-generated method stub + + // dummy for now + dummyBatchCount++; + } + + // fill next batch of data from text input, starting at nextReadPosition + private void fillBatch(VectorizedRowBatch b) { + if (textBatchFrame == null) { + textBatchFrame = new TextBatchFrame(b.numCols, recordDelimiterBytes); + } + + // fill batch with dummy string data for now + // fillBatchDummy(b); + + // initialize start and length arrays in frame to point to next batch of data in text buffer + int nextPosition = textBatchFrame.init(buf, bufferPosition); + textBatchFrame.fill(b); + } + + + // fill string columns of batch with dummy data + private void fillBatchDummy(VectorizedRowBatch b) { + final int SIZE = 3; + byte[] abc = null; + try { + abc = "abc".getBytes("UTF-8"); + } catch (Exception e) { + ; // eat it -- can't happen + } + for (int i = 0; i < b.cols.length; i++) { + for (int j = 0; j < SIZE; j++) { + if (b.cols[i] instanceof BytesColumnVector) { + BytesColumnVector bv = (BytesColumnVector) b.cols[i]; + bv.setRef(j, abc, 0, abc.length); + } + } + } + b.size = SIZE; + } + + @Override + public long getRowNumber() throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public float getProgress() throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void seekToRow(long rowCount) throws IOException { + // TODO Auto-generated method stub + + } + + /** + * A frame pointing into the text fields for one batch worth of data. + * + * TODO Issues to resolve: + * + * - find out how to do the mapping between columns in the batch and + * columns in the table. Need to get metadata for the table + * and the query to do the mapping. See if ORC already has + * done this mapping in a way that is usable. + * + * - determine how to deal with the situation where the text file + * has more or less columns of data that is present in the HIVE + * table schema. E.g. if a line as 3 fields and the table has 4, + * is the last column of the table considered NULL? + * And if the text file line has 10 fields and the schema has 7 + * fields for the table, are the last 3 fields ignored and skipped? + */ + static public class TextBatchFrame { + private int numCols; + private int start[][]; + private int length[][]; + private byte[] delimiter; + private Integer fieldLength; // scratch variable to hold field length as output parameter + + public static final byte CR = 13; // carriage return + public static final byte LF = 10; // line feed + + + public TextBatchFrame(int numCols, byte[] delimiter) { + this.numCols = numCols; + this.delimiter = delimiter; + fieldLength = new Integer(0); + + // create array of columns for start, length + start = new int[this.numCols][]; + length = new int[this.numCols][]; + + // create arrays for each column + for (int i = 0; i < numCols; i++) { + start[i] = new int[VectorizedRowBatch.DEFAULT_SIZE]; + length[i] = new int[VectorizedRowBatch.DEFAULT_SIZE]; + } + } + + public void fill(VectorizedRowBatch b) { + // TODO Auto-generated method stub + + } + + // Initialize start and length arrays in frame to point to next batch of data in text buffer. + // Return the next position in the buffer to read from. + public int init(byte[] buf, int bufferPosition) { + int nextPosition = -1; + for(int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + nextPosition = initRecord(i, buf, bufferPosition); + } + + return nextPosition; + } + + // Fill one record in start/length arrays, and return next position to read from. + private int initRecord(int row, byte[] buf, int bufferPosition) { + int nextPosition = bufferPosition; + + // TODO find line end + findLineEnd(); + + for (int i = 0; i < numCols; i++) { + nextPosition = initField(row, i, buf, nextPosition); + } + return nextPosition; + } + + private void findLineEnd() { + // TODO Auto-generated method stub + + } + + // Fill one field, and return next position to read from. + private int initField(int row, int col, byte[] buf, int nextPosition) { + start[row][col] = nextPosition; + nextPosition = advanceField(buf, nextPosition, fieldLength); + length[row][col] = fieldLength; + return nextPosition; + } + + /* Return the next position to read from. set fieldLength output + * argument to the length of the field. The next position returned must skip + * over the delimiter or the end-of-line. */ + private int advanceField(byte[] buf, int nextPosition, Integer fieldLength) { + int len = 0; + while (!startOfDelimiter(buf, nextPosition)) { + len++; + nextPosition++; + } + fieldLength = len; + nextPosition = advancePastDelimiter(); + return nextPosition; + } + + private static final int END_DELIM = 0; + private static final int END_BUFFER = 1; + private static final int END_LINE = 2; + private static final int NO_DELIM = 3; + private static final int START = 4; + private static final int FOUND_CR = 5; + private static final int CONSUMING_DELIM = 6; + + + /* Return true if the next character is the start of a delimiter or line end, + * or we are at the end of the buffer. */ + private boolean startOfDelimiter(byte[] buf, int nextPosition) { + if (nextPosition >= buf.length) { + return true; + } + + // Fast path for detecting we are definitely not starting a delimiter. + // Be able to handle newline as LF (Unix) or CR-LF (Windows) + if (buf[nextPosition] != delimiter[0] && buf[nextPosition] != CR + && buf[nextPosition] != LF) { + return false; + } + + /* Advance to one of these final states: + * + * END_DELIM found a whole field delimiter + * END_BUFFER end of buffer + * END_LINE found end of line + * NO_DELIM this character is not the start of a delimiter + * + * Intermediate working states: + * + * START + * FOUND_CR consuming end of line (found CR) + * CONSUMING_DELIM consuming delimiter + */ + + int state = START; + int i = nextPosition; + int delimPos = 0; + + search: + while(true) { + switch(state) { + case START: + if (nextPosition >= buf.length) { + state = END_BUFFER; + } else if (buf[i] == delimiter[0]) { + state = CONSUMING_DELIM; + delimPos++; + } else if (buf[i] == LF) { + state = END_LINE; + } else if (buf[i] == CR) { + state = FOUND_CR; + } + break; + case CONSUMING_DELIM: + ; // TODO + case FOUND_CR: + ; // TODO + default: + break search; + } + } + return false; // TODO fix this + } + + private int advancePastDelimiter() { + return 0; // TODO fix this + } + + + } +}