commit 55060f33e6acf83bb53c25cd4ae1b19b12af8e9c Author: Owen O'Malley Date: Fri Mar 21 14:21:37 2014 -0700 Hive-6604. Fix ACID to work with vectorization. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 0ccc3ad..ba8c501 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -990,24 +990,24 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, } OrcSplit split = (OrcSplit) inputSplit; - // TODO vectorized reader doesn't work with the new format yet - if (vectorMode) { - if (!split.getDeltas().isEmpty() || !split.isOriginal()) { - throw new IOException("Vectorization and ACID tables are incompatible." - ); - } - return createVectorizedReader(inputSplit, conf, reporter); - } reporter.setStatus(inputSplit.toString()); // if we are strictly old-school, just use the old code if (split.isOriginal() && split.getDeltas().isEmpty()) { - return new OrcRecordReader(OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf)), conf, split); + if (vectorMode) { + return createVectorizedReader(inputSplit, conf, reporter); + } else { + return new OrcRecordReader(OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf)), conf, split); + } } Options options = new Options(conf).reporter(reporter); final RowReader inner = getReader(inputSplit, options); + if (vectorMode) { + return (org.apache.hadoop.mapred.RecordReader) + new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); + } final RecordIdentifier id = inner.createKey(); // Return a RecordReader that is compatible with the Hive 0.12 reader diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java new file mode 100644 index 0000000..b0bc29e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; + +/** + * Implement a RecordReader that stitches together base and delta files to + * support tables and partitions stored in the ACID format. It works by using + * the non-vectorized ACID reader and moving the data into a vectorized row + * batch. + */ +class VectorizedOrcAcidRowReader + implements org.apache.hadoop.mapred.RecordReader { + private final AcidInputFormat.RowReader innerReader; + private final RecordIdentifier key; + private final OrcStruct value; + private final VectorizedRowBatchCtx rowBatchCtx; + private final ObjectInspector objectInspector; + private boolean needToSetPartition = true; + + VectorizedOrcAcidRowReader(AcidInputFormat.RowReader inner, + Configuration conf, + FileSplit split) throws IOException { + this.innerReader = inner; + this.key = inner.createKey(); + this.rowBatchCtx = new VectorizedRowBatchCtx(); + this.value = inner.createValue(); + this.objectInspector = inner.getObjectInspector(); + try { + rowBatchCtx.init(conf, split); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to initialize context", e); + } catch (SerDeException e) { + throw new IOException("Failed to initialize context", e); + } catch (InstantiationException e) { + throw new IOException("Failed to initialize context", e); + } catch (IllegalAccessException e) { + throw new IOException("Failed to initialize context", e); + } catch (HiveException e) { + throw new IOException("Failed to initialize context", e); + } + } + + @Override + public boolean next(NullWritable nullWritable, + VectorizedRowBatch vectorizedRowBatch + ) throws IOException { + vectorizedRowBatch.reset(); + if (!innerReader.next(key, value)) { + return false; + } + if (needToSetPartition) { + try { + rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch); + } catch (HiveException e) { + throw new IOException("Problem adding partition column", e); + } + needToSetPartition = false; + } + try { + VectorizedBatchUtil.AddRowToBatch(value, + (StructObjectInspector) objectInspector, + vectorizedRowBatch.size++, vectorizedRowBatch); + while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length && + innerReader.next(key, value)) { + VectorizedBatchUtil.AddRowToBatch(value, + (StructObjectInspector) objectInspector, + vectorizedRowBatch.size++, vectorizedRowBatch); + } + } catch (HiveException he) { + throw new IOException("error iterating", he); + } + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return VectorizedOrcInputFormat.VectorizedOrcRecordReader + .createRowBatch(rowBatchCtx); + } + + @Override + public long getPos() throws IOException { + return innerReader.getPos(); + } + + @Override + public void close() throws IOException { + innerReader.close(); + } + + @Override + public float getProgress() throws IOException { + return innerReader.getProgress(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 25fec62..39d36a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -59,7 +59,6 @@ VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { List types = file.getTypes(); - // TODO fix to work with ACID Reader.Options options = new Reader.Options(); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); @@ -93,7 +92,6 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti addPartitionCols = false; } reader.nextBatch(value); - rbCtx.convertRowBatchBlobToVectorizedBatch((Object) value, value.size, value); } catch (Exception e) { throw new RuntimeException(e); } @@ -108,13 +106,15 @@ public NullWritable createKey() { @Override public VectorizedRowBatch createValue() { - VectorizedRowBatch result = null; + return createRowBatch(rbCtx); + } + + static VectorizedRowBatch createRowBatch(VectorizedRowBatchCtx ctx) { try { - result = rbCtx.createVectorizedRowBatch(); + return ctx.createVectorizedRowBatch(); } catch (HiveException e) { throw new RuntimeException("Error creating a batch", e); } - return result; } @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index f0e1e39..3d789ee 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1110,14 +1110,17 @@ public void testVectorizationWithAcid() throws Exception { InputSplit[] splits = inputFormat.getSplits(conf, 10); assertEquals(1, splits.length); - try { - org.apache.hadoop.mapred.RecordReader + org.apache.hadoop.mapred.RecordReader reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); - assertTrue("should throw here", false); - } catch (IOException ioe) { - assertEquals("java.io.IOException: Vectorization and ACID tables are incompatible.", - ioe.getMessage()); + NullWritable key = reader.createKey(); + VectorizedRowBatch value = reader.createValue(); + assertEquals(true, reader.next(key, value)); + assertEquals(10, value.count()); + LongColumnVector col0 = (LongColumnVector) value.cols[0]; + for(int i=0; i < 10; i++) { + assertEquals("checking " + i, i, col0.vector[i]); } + assertEquals(false, reader.next(key, value)); } // test non-vectorized, non-acid, combine