diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index f513188..676e6c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -45,7 +45,7 @@ import org.apache.hadoop.mapred.FileSplit; /** - * Context for Vectorized row batch. this calss does eager deserialization of row data using serde + * Context for Vectorized row batch. this class does eager deserialization of row data using serde * in the RecordReader layer. * It has supports partitions in this layer so that the vectorized batch is populated correctly * with the partition column. @@ -96,12 +96,12 @@ public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspect public VectorizedRowBatchCtx() { } - + /** * Initializes the VectorizedRowBatch context based on an arbitrary object inspector - * Used by non-tablescan operators when they change the vectorization context + * Used by non-tablescan operators when they change the vectorization context * @param hiveConf - * @param fileKey + * @param fileKey * The key on which to retrieve the extra column mapping from the map scratch * @param rowOI * Object inspector that shapes the column types @@ -114,7 +114,7 @@ public void init(Configuration hiveConf, String fileKey, this.rowOI= rowOI; this.rawRowOI = rowOI; } - + /** * Initializes VectorizedRowBatch context based on the @@ -154,7 +154,7 @@ public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx part.getTableDesc().getProperties() : part.getProperties(); Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName()); - Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); + Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); partDeserializer.initialize(hiveConf, partProps); StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer .getObjectInspector(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java new file mode 100644 index 0000000..b749154 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextInputFormat.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.*; +import java.util.ArrayList; + +import org.apache.commons.io.Charsets; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A Vectorized {@link InputFormat} for plain text files in UTF-8 format. + */ +public class VectorizedTextInputFormat extends FileInputFormat + implements InputFormatChecker, VectorizedInputFormatInterface { + + private static class VectorizedTextRecordReader + implements RecordReader { + private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; + private final long offset; + private final long length; + private float progress = 0.0f; + private VectorizedRowBatchCtx rbCtx; + private boolean addPartitionCols = true; + + public VectorizedTextRecordReader(JobConf conf, FileSplit split, + byte[] recordDelimiterBytes) { + + rbCtx = new VectorizedRowBatchCtx(); + try { + rbCtx.init(conf, split); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.offset = split.getStart(); + this.length = split.getLength(); + + reader = new VectorizedTextReaderImpl(conf, split, recordDelimiterBytes, rbCtx); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + + if (!reader.hasNext()) { + return false; + } + try { + // Check and update partition cols if necessary. Ideally, this should be done + // in CreateValue as the partition is constant per split. But since Hive uses + // CombineHiveRecordReader and + // as this does not call CreateValue for each new RecordReader it creates, this check is + // required in next() + if (addPartitionCols) { + rbCtx.addPartitionColsToBatch(value); + addPartitionCols = false; + } + reader.nextBatch(value); + } catch (Exception e) { + throw new RuntimeException(e); + } + progress = reader.getProgress(); + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + VectorizedRowBatch result = null; + try { + result = rbCtx.createVectorizedRowBatch(); + } catch (HiveException e) { + throw new RuntimeException("Error creating a batch", e); + } + return result; + } + + @Override + public long getPos() throws IOException { + return offset + (long) (progress * length); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public float getProgress() throws IOException { + return progress; + } + } + + public VectorizedTextInputFormat() { + } + + private CompressionCodecFactory compressionCodecs = null; + + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + protected boolean isSplitable(FileSystem fs, Path file) { + final CompressionCodec codec = compressionCodecs.getCodec(file); + if (null == codec) { + return true; + } + return codec instanceof SplittableCompressionCodec; + } + + public RecordReader getRecordReader( + InputSplit genericSplit, JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(genericSplit.toString()); + String delimiter = job.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } + + return new VectorizedTextRecordReader(job, (FileSplit) genericSplit, + recordDelimiterBytes); + } + + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, + ArrayList files) throws IOException { + + // TODO Auto-generated method stub + return true; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextReaderImpl.java new file mode 100644 index 0000000..eb7d705 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedTextReaderImpl.java @@ -0,0 +1,132 @@ +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +public class VectorizedTextReaderImpl implements RecordReader { + + private JobConf job; + private FileSplit split; + private byte[] recordDelimiterBytes; + private VectorizedRowBatchCtx rbCtx; + private FSDataInputStream inStream = null; + private long nextReadPosition; + + private long dummyBatchCount = 0; + + public VectorizedTextReaderImpl(JobConf job, FileSplit split, + byte[] recordDelimiterBytes, VectorizedRowBatchCtx rbCtx) { + this.job = job; + this.split = split; + this.recordDelimiterBytes = recordDelimiterBytes; + this.rbCtx = rbCtx; + } + + @Override + public boolean hasNext() throws IOException { + // TODO Auto-generated method stub + return dummyBatchCount != 5; + } + + @Override + public Object next(Object previous) throws IOException { + throw new RuntimeException("row-mode not implemented in VectorizedTextInputFormat"); + } + + @Override + public VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) + throws IOException { + if (previousBatch == null) { + try { + previousBatch = rbCtx.createVectorizedRowBatch(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + } + + if (inStream == null) { + + // open file for split and position cursor at start of split + Path filePath = split.getPath(); + FileSystem fs = filePath.getFileSystem(job); + inStream = fs.open(filePath); + nextReadPosition = split.getStart(); + } + + // load batch with data from the current position in the split + fillBatch(previousBatch); + + // advance to the next position in split + advanceCursor(); + + return previousBatch; + } + + private void advanceCursor() { + // TODO Auto-generated method stub + + // dummy for now + dummyBatchCount++; + } + + // fill next batch of data from text input, starting at nextReadPosition + private void fillBatch(VectorizedRowBatch b) { + + // fill batch with dummy string data for now + fillBatchDummy(b); + } + + // fill string columns of batch with dummy data + private void fillBatchDummy(VectorizedRowBatch b) { + final int SIZE = 3; + byte[] abc = null; + try { + abc = "abc".getBytes("UTF-8"); + } catch (Exception e) { + ; // eat it -- can't happen + } + for (int i = 0; i < b.cols.length; i++) { + for (int j = 0; j < SIZE; j++) { + if (b.cols[i] instanceof BytesColumnVector) { + BytesColumnVector bv = (BytesColumnVector) b.cols[i]; + bv.setRef(j, abc, 0, abc.length); + } + } + } + b.size = SIZE; + } + + @Override + public long getRowNumber() throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public float getProgress() throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void seekToRow(long rowCount) throws IOException { + // TODO Auto-generated method stub + + } + +}