Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (revision a43510e21d01e6c78e98e7ad9469cbea70a66466) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) @@ -88,6 +88,10 @@ return info.mapSpillRecord.getIndex(reduce); } + IndexInformation getIndexInformation(String mapId) { + return cache.get(mapId); + } + private boolean isUnderConstruction(IndexInformation info) { synchronized(info) { return (null == info.mapSpillRecord); @@ -185,7 +189,7 @@ } } - private static class IndexInformation { + static class IndexInformation { SpillRecord mapSpillRecord; int getSize() { Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapDataCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapDataCache.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapDataCache.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.mapred; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.mapred.IndexCache.IndexInformation; +import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; + +class MapDataCache { + + private static final Log LOG = LogFactory.getLog(MapDataCache.class); + + private final long maxFileSize; + private final long maxRecordSize; + private final long totalDataMemoryAllowed; + private final int maxCacheFileNumber; + private volatile AtomicLong totalDataMemoryUsed = new AtomicLong(); + private final ConcurrentHashMap dataCache = + new ConcurrentHashMap<>(); + private final LinkedBlockingQueue dataQueue = + new LinkedBlockingQueue<>(); + private static final int DEFAULT_CHUNK_SIZE = 8192; + + public MapDataCache(JobConf conf) { + maxCacheFileNumber = + conf.getInt(TTConfig.TT_DATA_CACHE_MAX_FILE_NUMBER, 1000); + maxFileSize = + conf.getInt(TTConfig.TT_DATA_CACHE_MAX_FILE, 200) * 1024 * 1024L; + maxRecordSize = + conf.getInt(TTConfig.TT_DATA_CACHE_MAX_RECORD, 64) * 1024L; + totalDataMemoryAllowed = + conf.getInt(TTConfig.TT_DATA_CACHE, 2048) * 1024 * 1024L; + + Preconditions.checkArgument(maxFileSize > 100 * maxRecordSize, + "Invalid value configured for " + + TTConfig.TT_DATA_CACHE_MAX_RECORD + + ", it must be smaller than " + + TTConfig.TT_DATA_CACHE_MAX_FILE + " / 100."); + + Preconditions.checkArgument( + totalDataMemoryAllowed > 10 * maxFileSize, + "Invalid value configured for " + + TTConfig.TT_DATA_CACHE_MAX_RECORD + + ", it must be smaller than " + + TTConfig.TT_DATA_CACHE_MAX_FILE + " / 10."); + + LOG.info("DataCache created with max memory = " + totalDataMemoryAllowed); + } + + /** + * This method gets the map output data information for the given mapId and reduce. Once it read. + * The cache data will be removed from the cache because the cache data only can use once. Return + * null if the cache not exist or the file not appropriate cache. + */ + public ByteBuffer getDataInformation(String mapId, int reduce, + IndexInformation indexInfo, Path dataFileName, + String expectedIndexOwner) throws IOException { + // Need to cache file + DataInformation dataInfo = dataCache.get(mapId); + if (dataInfo == null) { + if (null == indexInfo) { + LOG.info("Could not found index cache data about mapId(" + mapId + ")."); + return null; + } + + // IndexInformation size is 0 + int indexSize = indexInfo.getSize(); + if (indexSize == 0) { + LOG.info("IndexInformation size about mapId(" + mapId + ") is null."); + return null; + } + + // The file or the record size is too big. + IndexRecord lastReduce = indexInfo.mapSpillRecord + .getIndex(indexInfo.mapSpillRecord.size() - 1); + long fileLength = lastReduce.startOffset + lastReduce.partLength; + long recordSize = fileLength * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / indexSize; + if (fileLength > maxFileSize || recordSize > maxRecordSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("File length or record size is too big. " + + "maxFileSize=" + maxFileSize + + ", current file length is " + fileLength + + ", maxRecordSize=" + maxRecordSize + + ", current record size is " + recordSize); + } + return null; + } + + dataInfo = dataCache.computeIfAbsent(mapId, k -> { + dataQueue.add(mapId); + return new DataInformation(); + }); + + // The data cache should no more once + dataInfo.init(this, dataFileName, mapId, indexInfo, + dataInfo, expectedIndexOwner); + } + + // Cache data only used once + ByteBuffer buf = dataInfo.removeOutputData(reduce); + if (buf != null) { + long memory = totalDataMemoryUsed.addAndGet(-buf.limit()); + if (memory == 0) { + LOG.info("DataCache HIT: MapId " + mapId + ", reduce:" + reduce + + ", limit:" + buf.limit() + ", capacity:" + buf.capacity() + + ", memory:" + memory); + } + } + + return buf; + } + + long getDataCacheUsage() { + return totalDataMemoryUsed.get(); + } + + /** + * Read data file to the cache. + */ + private void readDataFileToCache(Path dataFileName, String mapId, + IndexInformation indexInfo, + DataInformation dataInfo, String expectedOwner) throws IOException { + FileChannel channel = null; + RandomAccessFile spill = null; + final File spillFile = new File(dataFileName.toString()); + try { + int reduce = 0; + IndexRecord index; + SpillRecord record = indexInfo.mapSpillRecord; + int recordSize = record.size(); + // find first SpillRecord whose partLength is not 0 + do { + index = record.getIndex(reduce++); + } while (reduce < recordSize && index.partLength == 0); + // The partLength is all 0 or only one IndexRecord + if (reduce == recordSize) { + LOG.warn(mapId + ", reduce == recordSize, " + dataFileName.toString()); + return; + } + + spill = SecureIOUtils.openForRandomRead(spillFile, "r", + expectedOwner, null); + long fileLength = spill.length(); + channel = spill.getChannel(); + int chunkSize = Math.min(DEFAULT_CHUNK_SIZE, (int) fileLength); + ByteBuffer tmpBuf = ByteBuffer.allocate(chunkSize); + ByteBuffer outputDataBuf = dataInfo.getOutputDataOrCreate(reduce - 1, + (int) index.partLength); + long offset = 0; + long endOffset = index.startOffset + index.partLength; + + while (channel.read(tmpBuf) != -1) { + tmpBuf.flip(); + + // transfer read data to the cache + while (tmpBuf.hasRemaining()) { + if (offset == endOffset) { + outputDataBuf.flip(); + do { + index = record.getIndex(reduce++); + } while (reduce < recordSize && index.partLength == 0); + outputDataBuf = dataInfo.getOutputDataOrCreate(reduce - 1, + (int) index.partLength); + endOffset = index.startOffset + index.partLength; + } + outputDataBuf.put(tmpBuf.get()); + offset++; + } + tmpBuf.clear(); + } + // flip last output buffer + outputDataBuf.flip(); + } catch (Throwable e) { + LOG.warn("Read data to cache failed! " + dataFileName, e); + totalDataMemoryUsed.addAndGet(-dataInfo.getSize()); + dataInfo.clear(); + } finally { + if (channel != null) { + channel.close(); + } + if (spill != null) { + spill.close(); + } + } + } + + /** + * Bring memory usage below totalMemoryAllowed. + */ + private synchronized void freeDataInformation() { + if (LOG.isDebugEnabled()) { + LOG.debug("freeDataInformation invoked: current memory: " + + totalDataMemoryUsed.get() + + ", dataQueue size:" + dataQueue.size() + + ", dataCache size:" + dataCache.size()); + } + + // if cache map file number more than maxCacheFileNumber then remove some + if (dataQueue.size() > maxCacheFileNumber) { + for (int i = 0; i < maxCacheFileNumber / 20; i++) { + String mapId = dataQueue.remove(); + DataInformation dataInfo = dataCache.remove(mapId); + if (dataInfo != null) { + long size = dataInfo.getSize(); + totalDataMemoryUsed.addAndGet(-size); + LOG.info("Delete mapId: " + mapId + ", remained dataSize: " + size); + } + } + } + + // memory use size too big + while (totalDataMemoryUsed.get() > totalDataMemoryAllowed + && dataQueue.size() > 0) { + String mapId = dataQueue.remove(); + DataInformation dataInfo = dataCache.remove(mapId); + if (dataInfo != null) { + if (dataCache.size() == 0) { + LOG.warn("Delete mapId: " + mapId + ", current memory: " + + totalDataMemoryUsed.get() + ", reset dataSize to 0."); + totalDataMemoryUsed.set(0); + } else { + long size = dataInfo.getSize(); + totalDataMemoryUsed.addAndGet(-size); + LOG.info("Delete mapId: " + mapId + ", remained dataSize: " + size + + ", When memory is not enough."); + } + } + } + } + + private static class DataInformation { + + final ConcurrentHashMap recordDataCache = + new ConcurrentHashMap<>(); + + private boolean isUnderConstruct = true; + + private synchronized void init(MapDataCache dataCache, Path dataFileName, + String mapId, IndexInformation indexInfo, + DataInformation dataInfo, String expectedOwner) + throws IOException { + if (isUnderConstruct) { + dataCache.freeDataInformation(); + dataCache.readDataFileToCache(dataFileName, mapId, indexInfo, + dataInfo, expectedOwner); + dataCache.totalDataMemoryUsed.addAndGet(getSize()); + + isUnderConstruct = false; + notifyAll(); + } + } + + /** + * Get output data for the given partition. + */ + private ByteBuffer removeOutputData(int partition) { + if (isUnderConstruct) { + synchronized (this) { + if (isUnderConstruct) { + try { + wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + return recordDataCache.remove(partition); + } + + /** + * Put output data for the given partition. + */ + private ByteBuffer getOutputDataOrCreate(int partition, int capacity) { + return recordDataCache.computeIfAbsent(partition, k -> + capacity > 0 ? ByteBuffer.allocate(capacity) : null); + } + + public boolean isEmpty() { + return recordDataCache.isEmpty(); + } + + /** + * Removes all of cache data from this map. + */ + private void clear() { + recordDataCache.clear(); + } + + long getSize() { + long size = 0; + for (ByteBuffer buffer : recordDataCache.values()) { + if (buffer != null) { + size += buffer.limit(); + } + } + return size; + } + } +} Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (revision a43510e21d01e6c78e98e7ad9469cbea70a66466) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) @@ -31,6 +31,14 @@ public static final String TT_INDEX_CACHE = "mapreduce.tasktracker.indexcache.mb"; + public static final String TT_DATA_CACHE = + "mapreduce.tasktracker.datacache.mb"; + public static final String TT_DATA_CACHE_MAX_FILE_NUMBER = + "mapreduce.tasktracker.datacache.max.file.number"; + public static final String TT_DATA_CACHE_MAX_FILE = + "mapreduce.tasktracker.datacache.max.file.size.mb"; + public static final String TT_DATA_CACHE_MAX_RECORD = + "mapreduce.tasktracker.datacache.max.record.size.kb"; public static final String TT_MAP_SLOTS = "mapreduce.tasktracker.map.tasks.maximum"; public static final String TT_RESOURCE_CALCULATOR_PLUGIN = Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapDataCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapDataCache.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapDataCache.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; +import junit.framework.TestCase; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; +import org.apache.hadoop.security.UserGroupInformation; + +public class TestMapDataCache extends TestCase { + + private JobConf conf; + private FileSystem fs; + private Path p; + + @Override + public void setUp() throws IOException { + conf = new JobConf(); + fs = FileSystem.getLocal(conf).getRaw(); + p = new Path(System.getProperty("test.build.data", "/tmp"), + "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + /** + * Test data cache with multiple thread + * @throws IOException + */ + public void testDataCacheWithMultipleThread() + throws IOException, ExecutionException, InterruptedException { + fs.delete(p, true); + conf.setInt(TTConfig.TT_DATA_CACHE, 10); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 240; + conf.setInt(TTConfig.TT_DATA_CACHE_MAX_FILE, 10); + conf.setInt(TTConfig.TT_DATA_CACHE_MAX_RECORD, 64); + conf.setInt(TTConfig.TT_DATA_CACHE, 512); + IndexCache cache = new IndexCache(conf); + MapDataCache dataCache = new MapDataCache(conf); + + int threadNum = 50; + int batch = partsPerMap / threadNum; + ExecutorService pool = Executors.newFixedThreadPool(threadNum); + ArrayList> futureList = new ArrayList<>(); + + AtomicInteger total = new AtomicInteger(); + List> allTasks = new ArrayList<>(); + for (int num = 0; num < 100; num++) { + // fill index cache + String path = p.toUri().getPath(); + Path indexPath = new Path(path, "file-" + num + ".out.index"); + Path dataPath = new Path(path, "file-" + num + ".out"); + writeIndexFile(fs, indexPath, bytesPerFile, partsPerMap); + + // fill map cache data + String mapId = "attempt_1590659776734_1485580_m_064744_" + num; + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + writeDataFile(indexPath, dataPath, partsPerMap, cache, mapId, user); + + for (int i = 0; i < threadNum; i++) { + final int startReduce = i * batch; + // generate task for consume cache data + allTasks.add(() -> { + checkAllData(partsPerMap, cache, dataCache, batch, total, + indexPath, dataPath, mapId, user, startReduce); + return 0; + }); + } + } + + // random consume cache data + Random random = new Random(); + int size = allTasks.size(); + for (int i = 0; i < size; i++) { + int index = random.nextInt(size - i); + Future future = pool.submit(allTasks.remove(index)); + futureList.add(future); + } + + int result = 0; + for (Future future : futureList) { + result = result + future.get(); + } + + assertEquals(0, dataCache.getDataCacheUsage()); + assertEquals(0, result); + pool.shutdown(); + } + + /** + * check all cache data + */ + private void checkAllData(int partsPerMap, IndexCache cache, + MapDataCache dataCache, int batch, + AtomicInteger total, Path indexPath, + Path dataPath, String mapId, String user, + int startReduce) throws IOException { + RandomAccessFile spill = null; + try { + spill = SecureIOUtils.openForRandomRead( + new File(dataPath.toString()), "r", user, null); + int endReduce = startReduce + batch; + for (int j = startReduce; j < endReduce && j < partsPerMap; j++) { + total.incrementAndGet(); + checkCacheData(spill, j, cache, dataCache, + mapId, indexPath, user, dataPath); + } + } finally { + if (null != spill) { + try { + spill.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + /** + * check cache data is correct + */ + private void checkCacheData(RandomAccessFile spill, int reduce, + IndexCache cache, MapDataCache dataCache, String mapId, + Path indexPath, String user, Path dataPath) + throws IOException { + IndexRecord indexRecord = cache.getIndexInformation( + mapId, reduce, indexPath, user); + ByteBuffer buffer = dataCache.getDataInformation(mapId, reduce, + cache.getIndexInformation(mapId), dataPath, user); + + if (buffer == null) { + return; + } + + byte[] bytes = new byte[(int) indexRecord.partLength]; + spill.seek(indexRecord.startOffset); + spill.read(bytes); + + for (int i = 0; i < indexRecord.partLength; i++) { + assertEquals(buffer.get(), bytes[i]); + } + } + + private static void writeIndexFile(FileSystem fs, Path f, + long fill, int parts) throws IOException { + DataOutputStream dout = null; + try { + FSDataOutputStream out = fs.create(f, false); + CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); + dout = new DataOutputStream(iout); + + int offset = 0; + int bytesPerPart = (int) fill / parts; + Random random = new Random(); + for (int i = 0; i < parts; i++) { + long partLength = random.nextInt(bytesPerPart); + dout.writeLong(offset); + dout.writeLong(partLength); + dout.writeLong(partLength); + offset += partLength; + } + out.writeLong(iout.getChecksum().getValue()); + } finally { + if (dout != null) { + dout.close(); + } + } + } + + private static void writeDataFile(Path indexPath, + Path dataPath, int parts, + IndexCache cache, String mapId, String user) + throws IOException { + final File file = new File(dataPath.toString()); + try (RandomAccessFile out = SecureIOUtils.openForRandomRead(file, "rw", + user, null)) { + for (int reduce = 0; reduce < parts; ++reduce) { + IndexRecord indexRecord = cache.getIndexInformation( + mapId, reduce, indexPath, user); + out.seek(indexRecord.startOffset); + for (int j = 0; j < indexRecord.partLength; j++) { + out.writeByte((byte) (reduce & 0xFF)); + } + } + } + } +} Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (revision a43510e21d01e6c78e98e7ad9469cbea70a66466) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (revision 256fee78e637c9e37bf8e2cf4c1400cd139e4d95) @@ -845,6 +845,7 @@ private static final int ALLOWED_CONCURRENCY = 16; private final Configuration conf; private final IndexCache indexCache; + private final MapDataCache dataCache; private int port; private final LoadingCache pathCache = CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, @@ -892,6 +893,7 @@ public Shuffle(Configuration conf) { this.conf = conf; indexCache = new IndexCache(new JobConf(conf)); + dataCache = new MapDataCache(new JobConf(conf)); this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); } @@ -1272,30 +1274,38 @@ return null; } ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) == null) { - final FadvisedFileRegion partition = new FadvisedFileRegion(spill, - info.startOffset, info.partLength, manageOsCache, readaheadLength, - readaheadPool, spillfile.getAbsolutePath(), - shuffleBufferSize, shuffleTransferToAllowed); - writeFuture = ch.write(partition); - writeFuture.addListener(new ChannelFutureListener() { + ByteBuffer dataBuffer = dataCache.getDataInformation(mapId, reduce, + indexCache.getIndexInformation(mapId), + mapOutputInfo.mapOutputFileName, user); + if (null != dataBuffer && dataBuffer.limit() == info.partLength) { + // hit data cache + writeFuture = ch.write(wrappedBuffer(dataBuffer)); + } else { + if (ch.getPipeline().get(SslHandler.class) == null) { + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, + info.startOffset, info.partLength, manageOsCache, readaheadLength, + readaheadPool, spillfile.getAbsolutePath(), + shuffleBufferSize, shuffleTransferToAllowed); + writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures, // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - partition.transferSuccessful(); - } - partition.releaseExternalResources(); - } - }); - } else { - // HTTPS cannot be done with zero copy. - final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - info.startOffset, info.partLength, sslFileBufferSize, - manageOsCache, readaheadLength, readaheadPool, - spillfile.getAbsolutePath()); - writeFuture = ch.write(chunk); + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + partition.transferSuccessful(); + } + partition.releaseExternalResources(); + } + }); + } else { + // HTTPS cannot be done with zero copy. + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + info.startOffset, info.partLength, sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + spillfile.getAbsolutePath()); + writeFuture = ch.write(chunk); + } } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic