diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0eee5827a02..083e9e9c2d4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4099,6 +4099,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Value is an integer. Default value is -1, which means that we will estimate this value from operators in the plan."), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), + LLAP_IO_CACHE_ONLY("hive.llap.io.cache.only", false, "Whether the query should read from cache only. If set to " + + "true and a cache miss happens during the read an exception will occur. Primarily used for testing."), LLAP_IO_ROW_WRAPPER_ENABLED("hive.llap.io.row.wrapper.enabled", true, "Whether the LLAP IO row wrapper is enabled for non-vectorized queries."), LLAP_IO_ACID_ENABLED("hive.llap.io.acid", true, "Whether the LLAP IO layer is enabled for ACID."), LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 2893870c753..72951c734cd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -105,6 +105,8 @@ import com.google.common.collect.Lists; +import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; + /** * This produces EncodedColumnBatch via ORC EncodedDataImpl. * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where @@ -172,6 +174,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private Object fileKey; private final CacheTag cacheTag; private final Map parts; + private final boolean isReadCacheOnly; private Supplier fsSupplier; @@ -246,6 +249,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64")); consumer.setFileMetadata(fileMetadata); consumer.setSchemaEvolution(evolution); + isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY); } @Override @@ -462,7 +466,7 @@ private void ensureDataReader() throws IOException { // Reader creation updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader( - fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag); + fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag, isReadCacheOnly); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); stripeReader.setStopped(isStopped); } @@ -610,6 +614,7 @@ private OrcFileMetadata getFileFooterFromCacheOrDisk() throws IOException { } } counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); + throwIfCacheOnlyRead(isReadCacheOnly); } ensureOrcReader(); ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter(); @@ -691,6 +696,7 @@ private OrcStripeMetadata createOrcStripeMetadataObject(int stripeIx, StripeInfo } } counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); + throwIfCacheOnlyRead(isReadCacheOnly); } long offset = si.getOffset() + si.getIndexLength() + si.getDataLength(); long startTime = counters.startTimeCounter(); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index c73ba2c6e9e..c9e9c02f3b3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -105,6 +105,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; + public class SerDeEncodedDataReader extends CallableWithNdc implements ConsumerFeedback, TezCounterSource { @@ -165,6 +167,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final boolean[] writerIncludes; private FileReaderYieldReturn currentFileRead = null; + private final boolean isReadCacheOnly; /** * Data from cache currently being processed. We store it here so that we could decref @@ -232,6 +235,7 @@ public MemoryBuffer create() { SchemaEvolution evolution = new SchemaEvolution(schema, null, new Reader.Options(jobConf).include(writerIncludes)); consumer.setSchemaEvolution(evolution); + isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY); } private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) { @@ -810,6 +814,9 @@ public Boolean readFileWithCache(long startTime) throws IOException, Interrupted long endOfSplit = split.getStart() + split.getLength(); this.cachedData = cache.getFileData(fileKey, split.getStart(), endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData); + if (!gotAllData.value) { + throwIfCacheOnlyRead(isReadCacheOnly); + } if (cachedData == null) { if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache"); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java index a041426bba4..9ad148681bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.llap; +import java.io.IOException; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -107,4 +109,11 @@ public static MapWork findMapWork(JobConf job) throws HiveException { return (MapWork) work; } + public static void throwIfCacheOnlyRead(boolean isCacheOnlyRead) throws IOException { + if (isCacheOnlyRead) { + throw new IOException("LLAP cache miss happened while reading. Aborting query as cache only reading is set. " + + "Set " + HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname + " to false and repeat query if this was unintended."); + } + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 117e4e6eac7..1f5a9d7803b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -75,6 +75,8 @@ import sun.misc.Cleaner; +import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; + /** * Encoded reader implementation. @@ -152,11 +154,12 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final CacheTag tag; private AtomicBoolean isStopped; private StoppableAllocator allocator; + private final boolean isReadCacheOnly; public EncodedReaderImpl(Object fileKey, List types, TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException { this.fileKey = fileKey; this.compressionKind = kind; this.isCompressed = kind != org.apache.orc.CompressionKind.NONE; @@ -173,6 +176,7 @@ public EncodedReaderImpl(Object fileKey, List types, this.dataReader = dataReader; this.trace = trace; this.tag = tag; + this.isReadCacheOnly = isReadCacheOnly; if (POOLS != null) return; if (pf == null) { pf = new NoopPoolFactory(); @@ -600,6 +604,9 @@ private static int countMaxStreams(Area area) { BooleanRef isAllInCache = new BooleanRef(); if (hasFileId) { cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache); + if (!isAllInCache.value) { + throwIfCacheOnlyRead(isReadCacheOnly); + } if (LOG.isDebugEnabled()) { LOG.debug("Disk ranges after cache (found everything " + isAllInCache.value + "; file " + fileKey + ", base offset " + stripeOffset + "): " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 8d3336f68c8..d5712481af0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -47,7 +47,7 @@ * @return The reader. */ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException; + PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException; /** The factory that can create (or return) the pools used by encoded reader. */ public interface PoolFactory { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java index e137c24479e..ac345858b0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java @@ -36,8 +36,8 @@ public ReaderImpl(Path path, ReaderOptions options) throws IOException { @Override public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException { + PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException { return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(), - bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag); + bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag, isReadCacheOnly); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index ea6dfb8a881..2657e1d8c90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -86,6 +86,7 @@ import java.util.Set; import java.util.TreeMap; +import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; @@ -110,6 +111,7 @@ private Path cacheFsPath; private static final int MAP_DEFINITION_LEVEL_MAX = 3; private Map parts; + private final boolean isReadCacheOnly; /** * For each request column, the reader to read this column. This is NULL if this column @@ -151,6 +153,7 @@ public VectorizedParquetRecordReader( colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); //initialize the rowbatchContext jobConf = conf; + isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY); rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf); ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf); if (inputSplit != null) { @@ -316,6 +319,8 @@ private ParquetMetadata readSplitFooter(JobConf configuration, final Path file, } finally { metadataCache.decRefBuffer(footerData); } + } else { + throwIfCacheOnlyRead(isReadCacheOnly); } final FileSystem fs = file.getFileSystem(configuration); final FileStatus stat = fs.getFileStatus(file);