diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 9d16889..a27266f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,26 +19,17 @@ package org.apache.hadoop.hive.llap.io.api.impl; -import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import com.google.common.base.Joiner; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -63,16 +54,13 @@ import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcOiBatchToRowReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -84,8 +72,9 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.SchemaEvolution; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -122,7 +111,7 @@ InputSplit split, JobConf job, Reporter reporter) throws IOException { boolean useLlapIo = true; if (split instanceof LlapAwareSplit) { - useLlapIo = ((LlapAwareSplit)split).canUseLlapIo(); + useLlapIo = ((LlapAwareSplit) split).canUseLlapIo(); } boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job); @@ -135,17 +124,28 @@ LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split); return sourceInputFormat.getRecordReader(split, job, reporter); } - FileSplit fileSplit = (FileSplit)split; + FileSplit fileSplit = (FileSplit) split; reporter.setStatus(fileSplit.toString()); try { List includedCols = ColumnProjectionUtils.isReadAllColumns(job) ? null : ColumnProjectionUtils.getReadColumnIDs(job); LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName); - if (isVectorized) return rr; + + if (!rr.init()) { + return sourceInputFormat.getRecordReader(split, job, reporter); + } + + // vectorized row batch reader + if (isVectorized) { + return rr; + } + + // row batch to row-by-row reader if (sourceInputFormat instanceof BatchToRowInputFormat) { - return bogusCast(((BatchToRowInputFormat)sourceInputFormat).getWrapper( + return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper( rr, rr.getVectorizedRowBatchCtx(), includedCols)); } + return sourceInputFormat.getRecordReader(split, job, reporter); } catch (Exception ex) { throw new IOException(ex); @@ -182,12 +182,18 @@ /** Vector that is currently being processed by our user. */ private boolean isDone = false; private final boolean isClosed = false; - private ConsumerFeedback feedback; + private final ConsumerFeedback feedback; private final QueryFragmentCounters counters; private long firstReturnTime; + private final JobConf jobConf; + private final TypeDescription fileSchema; + private final boolean[] includedColumns; + private final ReadPipeline rp; + public LlapRecordReader(JobConf job, FileSplit split, List includedCols, String hostName) throws IOException, HiveException { + this.jobConf = job; this.split = split; this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); @@ -220,7 +226,37 @@ public LlapRecordReader(JobConf job, FileSplit split, List includedCols partitionValues = null; } - startRead(); + // Create the consumer of encoded data; it will coordinate decoding to CVBs. + rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters); + feedback = rp; + fileSchema = rp.getFileSchema(); + includedColumns = rp.getIncludedColumns(); + } + + /** + * Starts the data read pipeline + */ + public boolean init() { + boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + TypeDescription readerSchema = OrcInputFormat.getDesiredRowTypeDescr(jobConf, isAcidScan, + Integer.MAX_VALUE); + SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema, readerSchema, + includedColumns); + for (Integer colId : columnIds) { + if (!schemaEvolution.isPPDSafeConversion(colId)) { + LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split); + return false; + } + } + + // perform the data read asynchronously + if (executor instanceof StatsRecordingThreadPool) { + // Every thread created by this thread pool will use the same handler + ((StatsRecordingThreadPool) executor) + .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler()); + } + executor.submit(rp.getReadCallable()); + return true; } @Override @@ -282,19 +318,6 @@ public void uncaughtException(final Thread t, final Throwable e) { } } - private void startRead() { - // Create the consumer of encoded data; it will coordinate decoding to CVBs. - ReadPipeline rp = cvp.createReadPipeline( - this, split, columnIds, sarg, columnNames, counters); - feedback = rp; - if (executor instanceof StatsRecordingThreadPool) { - // Every thread created by this thread pool will use the same handler - ((StatsRecordingThreadPool) executor) - .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler()); - } - executor.submit(rp.getReadCallable()); - } - ColumnVectorBatch nextCvb() throws InterruptedException, IOException { boolean isFirst = (lastCvb == null); if (!isFirst) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index abd4533..b77dfbb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.io.decode; +import java.io.IOException; import java.util.List; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; @@ -32,5 +33,5 @@ public interface ColumnVectorProducer { ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters); + QueryFragmentCounters counters) throws IOException; } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 7db519c..0a8e3df 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.io.decode; +import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -64,7 +65,7 @@ public OrcColumnVectorProducer(OrcMetadataCache metadataCache, public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters) { + QueryFragmentCounters counters) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 3dfab63..94e4750 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.TreeReaderFactory; import org.apache.hadoop.hive.ql.io.orc.WriterImpl; import org.apache.orc.OrcProto; @@ -59,6 +61,7 @@ private OrcStripeMetadata[] stripes; private final boolean skipCorrupt; // TODO: get rid of this private final QueryFragmentCounters counters; + private boolean[] includedColumns; public OrcEncodedDataConsumer( Consumer consumer, int colCount, boolean skipCorrupt, @@ -228,4 +231,18 @@ private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders, private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) { return rowIndexEntry.getStatistics().getNumberOfValues(); } + + @Override + public TypeDescription getFileSchema() { + return OrcUtils.convertTypeFromProtobuf(fileMetadata.getTypes(), 0); + } + + @Override + public boolean[] getIncludedColumns() { + return includedColumns; + } + + public void setIncludedColumns(final boolean[] includedColumns) { + this.includedColumns = includedColumns; + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java index 21b1772..1987451 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java @@ -21,7 +21,10 @@ import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.orc.TypeDescription; public interface ReadPipeline extends ConsumerFeedback { public Callable getReadCallable(); + TypeDescription getFileSchema(); + boolean[] getIncludedColumns(); } \ No newline at end of file diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 93c40e4..4af9dc2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -162,10 +162,12 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { @SuppressWarnings("unused") private volatile boolean isPaused = false; + boolean[] globalIncludes = null; + public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer, - QueryFragmentCounters counters) { + QueryFragmentCounters counters) throws IOException { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; this.bufferManager = bufferManager; @@ -184,6 +186,19 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff } catch (IOException e) { throw new RuntimeException(e); } + + // moved this part of code from performDataRead as LlapInputFormat need to know the file schema + // to decide if schema evolution is supported or not + orcReader = null; + // 1. Get file metadata from cache, or create the reader and read it. + // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that + fs = split.getPath().getFileSystem(conf); + fileKey = determineFileId(fs, split, + HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID)); + fileMetadata = getOrReadFileMetadata(); + globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); + consumer.setFileMetadata(fileMetadata); + consumer.setIncludedColumns(globalIncludes); } @Override @@ -222,18 +237,9 @@ protected Void performDataRead() throws IOException { return null; } counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath())); - orcReader = null; - // 1. Get file metadata from cache, or create the reader and read it. - // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that - fs = split.getPath().getFileSystem(conf); - fileKey = determineFileId(fs, split, - HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID)); counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath() + (fileKey == null ? "" : " (" + fileKey + ")")); - try { - fileMetadata = getOrReadFileMetadata(); - consumer.setFileMetadata(fileMetadata); validateFileMetadata(); if (columnIds == null) { columnIds = createColumnIds(fileMetadata); @@ -257,10 +263,8 @@ protected Void performDataRead() throws IOException { // 3. Apply SARG if needed, and otherwise determine what RGs to read. int stride = fileMetadata.getRowIndexStride(); ArrayList stripeMetadatas = null; - boolean[] globalIncludes = null; boolean[] sargColumns = null; try { - globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); if (sarg != null && stride != 0) { // TODO: move this to a common method int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 0a2c3fa..2889f63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2169,12 +2169,11 @@ public static TypeDescription convertTypeInfo(TypeInfo info) { * @param isAcidRead is this an acid format? * @param dataColumns the desired number of data columns for vectorized read * @return the desired schema or null if schema evolution isn't enabled - * @throws IOException + * @throws IllegalArgumentException */ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcidRead, - int dataColumns - ) throws IOException { + int dataColumns) { String columnNameProperty = null; String columnTypeProperty = null; @@ -2205,7 +2204,7 @@ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, } } } else if (isAcidRead) { - throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); + throw new IllegalArgumentException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); } }