diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 9d16889..672ed4f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,26 +19,17 @@ package org.apache.hadoop.hive.llap.io.api.impl; -import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import com.google.common.base.Joiner; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -63,16 +54,13 @@ import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcOiBatchToRowReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -84,8 +72,9 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.SchemaEvolution; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -122,7 +111,7 @@ InputSplit split, JobConf job, Reporter reporter) throws IOException { boolean useLlapIo = true; if (split instanceof LlapAwareSplit) { - useLlapIo = ((LlapAwareSplit)split).canUseLlapIo(); + useLlapIo = ((LlapAwareSplit) split).canUseLlapIo(); } boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job); @@ -135,15 +124,26 @@ LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split); return sourceInputFormat.getRecordReader(split, job, reporter); } - FileSplit fileSplit = (FileSplit)split; + FileSplit fileSplit = (FileSplit) split; reporter.setStatus(fileSplit.toString()); try { List includedCols = ColumnProjectionUtils.isReadAllColumns(job) ? null : ColumnProjectionUtils.getReadColumnIDs(job); LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName); - if (isVectorized) return rr; + // currently only the evolutions that are safe for PPD is supported. + // byte -> short -> int -> bigint + // string <-> varchar + // For all other evolutions, LLAP IO will be disabled. This check cannot be done at compile + // time as we don't know file schema during compilation. + if (!rr.supportedSchemaEvolution()) { + LlapIoImpl.LOG.warn("Unsupported schema evolution. Not using LLAP IO for split: {}", split); + return sourceInputFormat.getRecordReader(split, job, reporter); + } + if (isVectorized) { + return rr; + } if (sourceInputFormat instanceof BatchToRowInputFormat) { - return bogusCast(((BatchToRowInputFormat)sourceInputFormat).getWrapper( + return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper( rr, rr.getVectorizedRowBatchCtx(), includedCols)); } return sourceInputFormat.getRecordReader(split, job, reporter); @@ -186,6 +186,10 @@ private final QueryFragmentCounters counters; private long firstReturnTime; + private TypeDescription fileSchema; + private boolean[] includedColumns; + private boolean safeSchemaEvolution; + public LlapRecordReader(JobConf job, FileSplit split, List includedCols, String hostName) throws IOException, HiveException { this.split = split; @@ -221,6 +225,19 @@ public LlapRecordReader(JobConf job, FileSplit split, List includedCols } startRead(); + + boolean isAcidScan = HiveConf.getBoolVar(job, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + safeSchemaEvolution = true; + TypeDescription readerSchema = OrcInputFormat.getDesiredRowTypeDescr(job, isAcidScan, + Integer.MAX_VALUE); + SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, + includedColumns); + for (Integer colId : includedCols) { + if (!schemaEvolution.isPPDSafeConversion(colId)) { + safeSchemaEvolution = false; + break; + } + } } @Override @@ -273,6 +290,10 @@ public VectorizedRowBatchCtx getVectorizedRowBatchCtx() { return rbCtx; } + public boolean supportedSchemaEvolution() { + return safeSchemaEvolution; + } + private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(final Thread t, final Throwable e) { @@ -282,11 +303,14 @@ public void uncaughtException(final Thread t, final Throwable e) { } } - private void startRead() { + private void startRead() throws IOException { // Create the consumer of encoded data; it will coordinate decoding to CVBs. ReadPipeline rp = cvp.createReadPipeline( this, split, columnIds, sarg, columnNames, counters); + fileSchema = rp.getFileSchema(); + includedColumns = rp.getIncludedColumns(); feedback = rp; + if (executor instanceof StatsRecordingThreadPool) { // Every thread created by this thread pool will use the same handler ((StatsRecordingThreadPool) executor) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index abd4533..b77dfbb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.io.decode; +import java.io.IOException; import java.util.List; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; @@ -32,5 +33,5 @@ public interface ColumnVectorProducer { ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters); + QueryFragmentCounters counters) throws IOException; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 7db519c..a919828 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.io.decode; +import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -64,13 +65,15 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters) { + QueryFragmentCounters counters) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters); edc.init(reader, reader); + edc.setFileMetadata(reader.getFileMetadata()); + edc.setIncludedColumns(reader.getIncludedColumns()); return edc; } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 3dfab63..94e4750 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.TreeReaderFactory; import org.apache.hadoop.hive.ql.io.orc.WriterImpl; import org.apache.orc.OrcProto; @@ -59,6 +61,7 @@ private OrcStripeMetadata[] stripes; private final boolean skipCorrupt; // TODO: get rid of this private final QueryFragmentCounters counters; + private boolean[] includedColumns; public OrcEncodedDataConsumer( Consumer consumer, int colCount, boolean skipCorrupt, @@ -228,4 +231,18 @@ private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders, private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) { return rowIndexEntry.getStatistics().getNumberOfValues(); } + + @Override + public TypeDescription getFileSchema() { + return OrcUtils.convertTypeFromProtobuf(fileMetadata.getTypes(), 0); + } + + @Override + public boolean[] getIncludedColumns() { + return includedColumns; + } + + public void setIncludedColumns(final boolean[] includedColumns) { + this.includedColumns = includedColumns; + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java index 21b1772..1987451 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java @@ -21,7 +21,10 @@ import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.orc.TypeDescription; public interface ReadPipeline extends ConsumerFeedback { public Callable getReadCallable(); + TypeDescription getFileSchema(); + boolean[] getIncludedColumns(); } \ No newline at end of file diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 93c40e4..7e0e065 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -162,10 +162,12 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { @SuppressWarnings("unused") private volatile boolean isPaused = false; + boolean[] globalIncludes = null; + public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer, - QueryFragmentCounters counters) { + QueryFragmentCounters counters) throws IOException { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; this.bufferManager = bufferManager; @@ -184,6 +186,17 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff } catch (IOException e) { throw new RuntimeException(e); } + + // moved this part of code from performDataRead as LlapInputFormat need to know the file schema + // to decide if schema evolution is supported or not + orcReader = null; + // 1. Get file metadata from cache, or create the reader and read it. + // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that + fs = split.getPath().getFileSystem(conf); + fileKey = determineFileId(fs, split, + HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID)); + fileMetadata = getOrReadFileMetadata(); + globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); } @Override @@ -222,18 +235,9 @@ protected Void performDataRead() throws IOException { return null; } counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath())); - orcReader = null; - // 1. Get file metadata from cache, or create the reader and read it. - // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that - fs = split.getPath().getFileSystem(conf); - fileKey = determineFileId(fs, split, - HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID)); counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath() + (fileKey == null ? "" : " (" + fileKey + ")")); - try { - fileMetadata = getOrReadFileMetadata(); - consumer.setFileMetadata(fileMetadata); validateFileMetadata(); if (columnIds == null) { columnIds = createColumnIds(fileMetadata); @@ -257,10 +261,8 @@ protected Void performDataRead() throws IOException { // 3. Apply SARG if needed, and otherwise determine what RGs to read. int stride = fileMetadata.getRowIndexStride(); ArrayList stripeMetadatas = null; - boolean[] globalIncludes = null; boolean[] sargColumns = null; try { - globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); if (sarg != null && stride != 0) { // TODO: move this to a common method int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( @@ -816,6 +818,14 @@ public void determineStripesToRead() { readState = new boolean[stripeIxTo - stripeIxFrom][][]; } + public OrcFileMetadata getFileMetadata() { + return fileMetadata; + } + + public boolean[] getIncludedColumns() { + return globalIncludes; + } + private class DataWrapperForOrc implements DataReader, DataCache { private final DataReader orcDataReader;