diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java index f68ebd7c6d..dcb24b8018 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -224,6 +224,7 @@ public DiskRangeList createCacheChunk( int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from); ByteBuffer data = drl.getData().duplicate(); data.get(array, arrayOffset + offsetFromReadStart, candidateSize); + cache.releaseBuffer(((CacheChunk)drl).getBuffer()); sizeRead += candidateSize; drl = drl.next; } @@ -250,6 +251,7 @@ public DiskRangeList createCacheChunk( if (candidate.hasData()) { ByteBuffer data = candidate.getData().duplicate(); data.get(array, arrayOffset + offsetFromReadStart, candidateSize); + cache.releaseBuffer(((CacheChunk)candidate).getBuffer()); sizeRead += candidateSize; continue; } @@ -269,6 +271,10 @@ public DiskRangeList createCacheChunk( long chunkFrom = Math.max(from, missingChunk.getKey()), chunkTo = Math.min(to, missingChunk.getValue()), chunkLength = chunkTo - chunkFrom; + // TODO: if we allow partial reads (right now we disable this), we'd have to handle it here. + // chunksInThisRead should probably be changed to be a struct array indicating both + // partial and full sizes for each chunk; then the partial ones could be merged + // with the previous partial ones, and any newly-full chunks put in the cache. MemoryBuffer[] largeBuffers = null, smallBuffer = null, newCacheData = null; try { int largeBufCount = (int) (chunkLength / maxAlloc); @@ -364,12 +370,12 @@ private void copyDiskDataToCacheBuffer(byte[] diskData, int offsetInDiskData, in int maxAlloc, long from, long to) { Map.Entry firstMissing = chunkIndex.floorEntry(from); if (firstMissing == null) { - throw new AssertionError("No lower bound for offset " + from); + throw new AssertionError("No lower bound for start offset " + from); } if (firstMissing.getValue() <= from || ((from - firstMissing.getKey()) % maxAlloc) != 0) { // The data does not belong to a recognized chunk, or is split wrong. - throw new AssertionError("Lower bound for offset " + from + " is [" + throw new AssertionError("Lower bound for start offset " + from + " is [" + firstMissing.getKey() + ", " + firstMissing.getValue() + ")"); } SortedMap missingChunks = chunkIndex.subMap(firstMissing.getKey(), to); @@ -381,7 +387,7 @@ private void copyDiskDataToCacheBuffer(byte[] diskData, int offsetInDiskData, in if (lastMissingEnd < to || (to != lastMissingEnd && ((to - lastMissingOffset) % maxAlloc) != 0)) { // The data does not belong to a recognized chunk, or is split wrong. - throw new AssertionError("Lower bound for offset " + to + " is [" + throw new AssertionError("Lower bound for end offset " + to + " is [" + lastMissingOffset + ", " + lastMissingEnd + ")"); } return missingChunks; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index ec8527ef27..3cb7ab5279 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -274,7 +274,7 @@ public void configure(JobConf job) { if (part == null) { if (isCacheOnly) { LOG.info("Using cache only because there's no partition spec for SerDe-based IF"); - injectLlapCaches(inputFormat, llapIo); + injectLlapCaches(inputFormat, llapIo, conf); } else { LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF"); } @@ -294,7 +294,7 @@ public void configure(JobConf job) { } } if (isCacheOnly) { - injectLlapCaches(inputFormat, llapIo); + injectLlapCaches(inputFormat, llapIo, conf); } return inputFormat; } @@ -320,8 +320,9 @@ private static boolean checkInputFormatForLlapEncode(Configuration conf, String } public static void injectLlapCaches(InputFormat inputFormat, - LlapIo llapIo) { + LlapIo llapIo, Configuration conf) { LOG.info("Injecting LLAP caches into " + inputFormat.getClass().getCanonicalName()); + conf.setInt("parquet.read.allocation.size", 1024*1024*1024); // Disable buffer splitting for now. llapIo.initCacheOnlyInputFormat(inputFormat); }