diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95c5c0e..ef3d698 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.conf.Validator.SizeValidator; import org.apache.hadoop.hive.conf.Validator.StringSet; import org.apache.hadoop.hive.conf.Validator.TimeValidator; +import org.apache.hadoop.hive.conf.Validator.WritableDirectoryValidator; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; @@ -2556,6 +2558,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"), LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true, "Whether ORC low-level cache should use direct allocation."), + LLAP_ALLOCATOR_MAPPED("hive.llap.io.allocator.mmap", false, + "Whether ORC low-level cache should use memory mapped allocation (direct I/O). \n" + + "This is recommended to be used along-side NVDIMM (DAX) or NVMe flash storage."), + LLAP_ALLOCATOR_MAPPED_PATH("hive.llap.io.allocator.mmap.path", "/dev/shm", + new WritableDirectoryValidator(), + "The directory location for mapping NVDIMM/NVMe flash storage into the ORC low-level cache."), LLAP_USE_LRFU("hive.llap.io.use.lrfu", false, "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."), LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f, diff --git common/src/java/org/apache/hadoop/hive/conf/Validator.java common/src/java/org/apache/hadoop/hive/conf/Validator.java index 3fb09b9..838660a 100644 --- common/src/java/org/apache/hadoop/hive/conf/Validator.java +++ common/src/java/org/apache/hadoop/hive/conf/Validator.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.conf; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashSet; @@ -346,4 +349,30 @@ private String sizeString(long size) { return current > 0 ? ((long)(size / current) + "Pb") : (size + units[0]); } } + + public class WritableDirectoryValidator implements Validator { + + @Override + public String validate(String value) { + final Path path = FileSystems.getDefault().getPath(value); + if (path == null && value != null) { + return String.format("Path '%s' provided could not be located.", value); + } + final boolean isDir = Files.isDirectory(path); + final boolean isWritable = Files.isWritable(path); + if (!isDir) { + return String.format("Path '%s' provided is not a directory.", value); + } + if (!isWritable) { + return String.format("Path '%s' provided is not writable.", value); + } + return null; + } + + @Override + public String toDescription() { + return "Expects a writable directory on the local filesystem"; + } + } + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 824ff33..182ad00 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -18,9 +18,23 @@ package org.apache.hadoop.hive.llap.cache; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,33 +53,62 @@ private final int minAllocation, maxAllocation, arenaSize; private final long maxSize; private final boolean isDirect; + private final boolean isMapped; + private final Path cacheDir; private final LlapDaemonCacheMetrics metrics; // We don't know the acceptable size for Java array, so we'll use 1Gb boundary. // That is guaranteed to fit any maximum allocation. private static final int MAX_ARENA_SIZE = 1024*1024*1024; + private static final FileAttribute> RWX = PosixFilePermissions + .asFileAttribute(PosixFilePermissions.fromString("rwx------")); + private static final FileAttribute> RW_ = PosixFilePermissions + .asFileAttribute(PosixFilePermissions.fromString("rw-------")); + public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) { this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT), + HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED), (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC), (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC), HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT), HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE), + HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH), mm, metrics); } @VisibleForTesting public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int arenaCount, long maxSizeVal, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) { + this(isDirectVal, false /*isMapped*/, minAllocVal, maxAllocVal, arenaCount, maxSizeVal, null /* mapping path */, memoryManager, metrics); + } + + @VisibleForTesting + public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal, + int arenaCount, long maxSizeVal, String mapPath, MemoryManager memoryManager, + LlapDaemonCacheMetrics metrics) { isDirect = isDirectVal; + isMapped = isMappedVal; minAllocation = minAllocVal; maxAllocation = maxAllocVal; + if (isMapped) { + try { + cacheDir = + Files.createTempDirectory(FileSystems.getDefault().getPath(mapPath), "llap-", RWX); + } catch (IOException ioe) { + // conf validator already checks this, so it will never trigger usually + throw new AssertionError("Configured mmap directory should be writable", ioe); + } + } else { + cacheDir = null; + } int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount); arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE)); if (LlapIoImpl.LOG.isInfoEnabled()) { - LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") - + " buffers; allocation sizes " + minAllocation + " - " + maxAllocation + LlapIoImpl.LOG.info("Buddy allocator with " + (isDirect ? "direct" : "byte") + " buffers;" + + (isMapped ? (" memory mapped off " + cacheDir.toString() + "; ") : "") + + "allocation sizes " + minAllocation + " - " + maxAllocation + ", arena size " + arenaSizeVal + ". total size " + maxSizeVal); } @@ -272,6 +315,26 @@ public long getMaxCacheSize() { return maxSize; } + private ByteBuffer preallocate(int arenaSize) { + if (isMapped) { + Preconditions.checkArgument(isDirect, "All memory mapped allocations have to be direct buffers"); + try { + File rf = File.createTempFile("arena-", ".cache", cacheDir.toFile()); + RandomAccessFile rwf = new RandomAccessFile(rf, "rw"); + rwf.setLength(arenaSize); // truncate (TODO: posix_fallocate?) + ByteBuffer rwbuf = rwf.getChannel().map(MapMode.PRIVATE, 0, arenaSize); + // A mapping, once established, is not dependent upon the file channel that was used to + // create it. delete file and hold onto the map + rwf.close(); + rf.delete(); + return rwbuf; + } catch (IOException ioe) { + LlapIoImpl.LOG.warn("Failed trying to allocate memory mapped arena", ioe); + } + } + return isDirect ? ByteBuffer.allocateDirect(arenaSize) : ByteBuffer.allocate(arenaSize); + } + private class Arena { private ByteBuffer data; // Avoid storing headers with data since we expect binary size allocations. @@ -280,7 +343,7 @@ public long getMaxCacheSize() { private FreeList[] freeLists; void init() { - data = isDirect ? ByteBuffer.allocateDirect(arenaSize) : ByteBuffer.allocate(arenaSize); + data = preallocate(arenaSize); int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2); headers = new byte[maxMinAllocs]; int allocLog2Diff = maxAllocLog2 - minAllocLog2, freeListCount = allocLog2Diff + 1; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 8cd6df7..9bee994 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -198,16 +198,23 @@ private void run(String[] args) throws Exception { if (options.getSize() != -1) { if (options.getCache() != -1) { - Preconditions.checkArgument(options.getCache() < options.getSize(), - "Cache has to be smaller than the container sizing"); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) { + // direct heap allocations need to be safer + Preconditions.checkArgument(options.getCache() < options.getSize(), + "Cache has to be smaller than the container sizing"); + } else if (options.getCache() < options.getSize()) { + LOG.warn("Note that this might need YARN physical memory monitoring to be turned off (yarn.nodemanager.pmem-check-enabled=false)"); + } } if (options.getXmx() != -1) { Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory has to be smaller than the container sizing"); } - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT) + && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { + // direct and not memory mapped Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(), - "Working memory + cache has to be smaller than the containing sizing "); + "Working memory + cache has to be smaller than the container sizing "); } }