diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/FadvisedFileRegion.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/FadvisedFileRegion.java index 57f29d8..9dfd40b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/FadvisedFileRegion.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/FadvisedFileRegion.java @@ -43,13 +43,14 @@ private final int shuffleBufferSize; private final boolean shuffleTransferToAllowed; private final FileChannel fileChannel; + private final boolean canEvictAfterTransfer; private ReadaheadRequest readaheadRequest; public FadvisedFileRegion(RandomAccessFile file, long position, long count, boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, String identifier, int shuffleBufferSize, - boolean shuffleTransferToAllowed) throws IOException { + boolean shuffleTransferToAllowed, boolean canEvictAfterTransfer) throws IOException { super(file.getChannel(), position, count); this.manageOsCache = manageOsCache; this.readaheadLength = readaheadLength; @@ -61,6 +62,8 @@ public FadvisedFileRegion(RandomAccessFile file, long position, long count, this.position = position; this.shuffleBufferSize = shuffleBufferSize; this.shuffleTransferToAllowed = shuffleTransferToAllowed; + // To indicate whether the pages should be thrown away or not. + this.canEvictAfterTransfer = canEvictAfterTransfer; } @Override @@ -149,9 +152,12 @@ public void releaseExternalResources() { public void transferSuccessful() { if (manageOsCache && getCount() > 0) { try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, - fd, getPosition(), getCount(), - NativeIO.POSIX.POSIX_FADV_DONTNEED); + if (canEvictAfterTransfer) { + LOG.debug("shuffleBufferSize: {}, path: {}", shuffleBufferSize, identifier); + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, + fd, getPosition(), getCount(), + NativeIO.POSIX.POSIX_FADV_DONTNEED); + } } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index f11f0e8..f22fd01 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -130,6 +130,10 @@ public static final String SHUFFLE_MANAGE_OS_CACHE = "llap.shuffle.manage.os.cache"; public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; + public static final String SHUFFLE_OPTIMIZE_OS_CACHE_EVICT = + "llap.shuffle.os.cache.optimize.evict"; + public static final boolean DEFAULT_SHUFFLE_OPTIMIZE_OS_CACHE_EVICT = true; + public static final String SHUFFLE_READAHEAD_BYTES = "llap.shuffle.readahead.bytes"; public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; @@ -156,6 +160,7 @@ * sendfile */ private final boolean manageOsCache; + private final boolean optimizeOsCacheEviction; private final int readaheadLength; private final int maxShuffleConnections; private final int shuffleBufferSize; @@ -255,6 +260,8 @@ private ShuffleHandler(Configuration conf) { this.conf = conf; manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, DEFAULT_SHUFFLE_MANAGE_OS_CACHE); + optimizeOsCacheEviction = conf.getBoolean(SHUFFLE_OPTIMIZE_OS_CACHE_EVICT, + DEFAULT_SHUFFLE_OPTIMIZE_OS_CACHE_EVICT); readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, DEFAULT_SHUFFLE_READAHEAD_BYTES); @@ -316,6 +323,14 @@ private ShuffleHandler(Configuration conf) { LOG.info("DirWatcher disabled by config"); dirWatcher = null; } + LOG.info("manageOsCache:{}, optimizeOsCacheEviction:{}, readaheadLength:{}" + + ", maxShuffleConnections:{}, localDirs:{}" + + ", shuffleBufferSize:{}, shuffleTransferToAllowed:{}" + + ", connectionKeepAliveEnabled:{}, connectionKeepAliveTimeOut:{}" + + ", mapOutputMetaInfoCacheSize:{}, sslFileBufferSize:{}", + manageOsCache, optimizeOsCacheEviction,readaheadLength, maxShuffleConnections, localDirs, + shuffleBufferSize, shuffleTransferToAllowed, connectionKeepAliveEnabled, + connectionKeepAliveTimeOut, mapOutputMetaInfoCacheSize, sslFileBufferSize); } @@ -971,10 +986,14 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { + boolean canEvictAfterTransfer = (reduce != 0); // e.g broadcast data + if (!optimizeOsCacheEviction) { + canEvictAfterTransfer = true; + } final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath(), - shuffleBufferSize, shuffleTransferToAllowed); + shuffleBufferSize, shuffleTransferToAllowed, canEvictAfterTransfer); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures,