Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (revision f9619b0b974fc767bbd8c579f7bbaa028dce6fe4) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) @@ -88,6 +88,10 @@ return info.mapSpillRecord.getIndex(reduce); } + IndexInformation getIndexInformation(String mapId) { + return cache.get(mapId); + } + private boolean isUnderConstruction(IndexInformation info) { synchronized(info) { return (null == info.mapSpillRecord); @@ -185,7 +189,7 @@ } } - private static class IndexInformation { + protected static class IndexInformation { SpillRecord mapSpillRecord; int getSize() { Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapDataCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapDataCache.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapDataCache.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.mapred; + +import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.mapred.IndexCache.IndexInformation; +import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; + +class MapDataCache { + + private static final Log LOG = LogFactory.getLog(MapDataCache.class); + + private final long maxFileSize; + private final long maxRecordSize; + private final long totalDataMemoryAllowed; + private final int maxCacheFileNumber; + private volatile AtomicLong totalDataMemoryUsed = new AtomicLong(); + private final ConcurrentHashMap dataCache = + new ConcurrentHashMap<>(); + private final LinkedBlockingQueue dataQueue = + new LinkedBlockingQueue<>(); + + private final LinkedBlockingQueue lastDataQueue = + new LinkedBlockingQueue<>(); + private final Set lastDataSet = + Collections.synchronizedSet(Sets.newHashSet()); + + private static final int DEFAULT_CHUNK_SIZE = 8192; + + public MapDataCache(JobConf conf) { + maxCacheFileNumber = + conf.getInt(TTConfig.TT_DATA_CACHE_MAX_FILE_NUMBER, 500); + maxFileSize = + conf.getInt(TTConfig.TT_DATA_CACHE_MAX_FILE, 64) * 1024 * 1024L; + maxRecordSize = + conf.getInt(TTConfig.TT_DATA_CACHE_MAX_RECORD, 32) * 1024L; + totalDataMemoryAllowed = + conf.getInt(TTConfig.TT_DATA_CACHE, 1024) * 1024 * 1024L; + + LOG.info("DataCache created with max memory = " + totalDataMemoryAllowed); + } + + /** + * This method gets the map output data information for the given mapId and reduce. Once it read. + * The cache data will be removed from the cache because the cache data only can use once. Return + * null if the cache not exist or the file not appropriate cache. + */ + byte[] getDataInformation(String mapId, int reduce, + IndexInformation indexInfo, Path dataFileName, + String expectedIndexOwner) throws IOException { + // Need to cache file + DataInformation dataInfo = dataCache.get(mapId); + if (dataInfo == null) { + if (null == indexInfo) { + LOG.info("Could not found index cache data about mapId(" + mapId + ")."); + return null; + } + + // IndexInformation size is 0 + int indexSize = indexInfo.getSize(); + if (indexSize == 0) { + LOG.info("IndexInformation size about mapId(" + mapId + ") is null."); + return null; + } + + // The file or the record size is too big. + IndexRecord lastReduce = indexInfo.mapSpillRecord + .getIndex(indexInfo.mapSpillRecord.size() - 1); + long fileLength = lastReduce.startOffset + lastReduce.partLength; + long recordSize = fileLength * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / indexSize; + if (fileLength > maxFileSize || recordSize > maxRecordSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("File length or record size is too big. " + + "maxFileSize=" + maxFileSize + ", current file length is " + fileLength + + ", maxRecordSize=" + maxRecordSize + ", current record size is " + recordSize); + } + return null; + } + + dataInfo = dataCache.computeIfAbsent(mapId, k -> { + dataQueue.add(mapId); + return new DataInformation(); + }); + + // The data cache should no more once + dataInfo.init(this, dataFileName, mapId, indexInfo, expectedIndexOwner); + } + + // Cache data only used once + byte[] buf = dataInfo.removeOutputData(reduce); + if (buf != null) { + long memory = totalDataMemoryUsed.addAndGet(-buf.length); + if (memory == 0) { + LOG.info("DataCache HIT: MapId " + mapId + ", reduce:" + reduce + + ", length:" + buf.length + ", memory:" + memory); + } + } + return buf; + } + + /** + * Add memory to the totalDataMemoryUsed + */ + private void addMemory(long memory) { + // avoid dead loop when all the mapIds are under construction + int max = dataQueue.size(); + // memory use size too big + while (totalDataMemoryUsed.get() > totalDataMemoryAllowed + && dataQueue.size() > 0 && max-- > 0) { + String mapId = dataQueue.remove(); + DataInformation dataInfo = dataCache.get(mapId); + if (dataInfo != null) { + // Can not remove it when it under construct status + if (dataInfo.isUnderConstruct) { // avoid dead lock + dataQueue.add(mapId); + } else { + long size = dataInfo.getSizeAndClear(); + totalDataMemoryUsed.addAndGet(-size); + if (LOG.isDebugEnabled()) { + LOG.debug("Delete mapId: " + mapId + ", remained dataSize: " + size + + ", current memory: " + totalDataMemoryUsed.get() + + ", When memory is not enough."); + } + } + } + if (max == 0) { + LOG.warn("All the mapIds are under construction! " + + "dataQueue.size=" + dataQueue.size() + + ", current memory: " + totalDataMemoryUsed.get()); + } + } + + totalDataMemoryUsed.addAndGet(memory); + } + + void enqueue(String mapId) { + // avoid dead loop when all the mapIds are under construction + int max = lastDataQueue.size(); + while (lastDataQueue.size() > maxCacheFileNumber && max-- > 0) { + String removingMapId = lastDataQueue.remove(); + lastDataSet.remove(removingMapId); + DataInformation dataInfo = dataCache.get(removingMapId); + + if (dataInfo != null) { + // Can not remove it when it under construct status + if (dataInfo.isUnderConstruct) { // avoid dead lock + lastDataQueue.add(removingMapId); + lastDataSet.add(removingMapId); + } else { + dataQueue.remove(removingMapId); + dataCache.remove(removingMapId); + long size = dataInfo.getSizeAndClear(); + if (size > 0) { + totalDataMemoryUsed.addAndGet(-size); + } + } + } + if (max == 0) { + LOG.warn("All the mapIds are under construction! " + + "lastDataQueue.size=" + lastDataQueue.size() + + ", current memory: " + totalDataMemoryUsed.get()); + } + } + + if (!lastDataSet.contains(mapId)) { + lastDataQueue.add(mapId); + lastDataSet.add(mapId); + } + } + + long getDataCacheUsage() { + return totalDataMemoryUsed.get(); + } + + private static class DataInformation { + + private volatile boolean isUnderConstruct = true; + final ConcurrentHashMap recordDataCache = + new ConcurrentHashMap<>(); + + private synchronized void init(MapDataCache dataCache, Path dataFileName, + String mapId, IndexInformation indexInfo, + String expectedOwner) + throws IOException { + if (isUnderConstruct) { + readDataFileToCache(dataCache, dataFileName, indexInfo, expectedOwner); + isUnderConstruct = false; + dataCache.enqueue(mapId); + notifyAll(); + } + } + + /** + * Read data file to the cache. + */ + private void readDataFileToCache(MapDataCache dataCache, Path dataFileName, + IndexInformation indexInfo, String expectedOwner) throws IOException { + FileChannel channel = null; + RandomAccessFile spill = null; + final File spillFile = new File(dataFileName.toString()); + try { + int reduce = 0; + IndexRecord index; + SpillRecord record = indexInfo.mapSpillRecord; + int recordSize = record.size(); + // find first SpillRecord whose partLength is not 0 + do { + index = record.getIndex(reduce++); + } while (reduce < recordSize && index.partLength == 0); + // The partLength is all 0 or only one IndexRecord + if (reduce == recordSize) { + return; + } + + spill = SecureIOUtils.openForRandomRead(spillFile, "r", + expectedOwner, null); + long fileLength = spill.length(); + channel = spill.getChannel(); + int chunkSize = Math.min(DEFAULT_CHUNK_SIZE, (int) fileLength); + ByteBuffer tmpBuf = ByteBuffer.allocate(chunkSize); + byte[] outputDataBuf = getOutputDataOrCreate(dataCache, reduce - 1, + (int) index.partLength); + int outputDataIndex = 0; + long offset = 0; + long endOffset = index.startOffset + index.partLength; + + while (channel.read(tmpBuf) != -1) { + tmpBuf.flip(); + + // transfer read data to the cache + while (tmpBuf.hasRemaining()) { + if (offset == endOffset) { + outputDataIndex = 0; + do { + index = record.getIndex(reduce++); + } while (reduce < recordSize && index.partLength == 0); + outputDataBuf = getOutputDataOrCreate(dataCache, reduce - 1, + (int) index.partLength); + endOffset = index.startOffset + index.partLength; + } + outputDataBuf[outputDataIndex++] = tmpBuf.get(); + offset++; + } + tmpBuf.clear(); + } + } catch (Throwable e) { + LOG.warn("Read data to cache failed! " + dataFileName, e); + dataCache.addMemory(-getSizeAndClear()); + } finally { + if (channel != null) { + channel.close(); + } + if (spill != null) { + spill.close(); + } + } + } + + + /** + * Get output data for the given partition. + */ + private byte[] removeOutputData(int partition) { + if (isUnderConstruct) { + synchronized (this) { + if (isUnderConstruct) { + try { + wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + return recordDataCache.remove(partition); + } + + /** + * Put output data for the given partition. + */ + private byte[] getOutputDataOrCreate(MapDataCache dataCache, + int partition, int capacity) { + if (capacity > 0) { + dataCache.addMemory(capacity); + } + return recordDataCache.computeIfAbsent(partition, k -> + capacity > 0 ? new byte[capacity] : null); + } + + long getSize() { + long size = 0; + for (byte[] buffer : recordDataCache.values()) { + if (buffer != null) { + size += buffer.length; + } + } + return size; + } + + long getSizeAndClear() { + Enumeration keys = recordDataCache.keys(); + long size = 0; + while (keys.hasMoreElements()) { + Integer reduce = keys.nextElement(); + byte[] data = removeOutputData(reduce); + if (data != null) { + size += data.length; + } + } + return size; + } + } +} Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (revision f9619b0b974fc767bbd8c579f7bbaa028dce6fe4) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) @@ -35,4 +35,13 @@ "mapreduce.tasktracker.map.tasks.maximum"; public static final String TT_RESOURCE_CALCULATOR_PLUGIN = "mapreduce.tasktracker.resourcecalculatorplugin"; + + public static final String TT_DATA_CACHE = + "mapreduce.tasktracker.datacache.mb"; + public static final String TT_DATA_CACHE_MAX_FILE_NUMBER = + "mapreduce.tasktracker.datacache.max.file.number"; + public static final String TT_DATA_CACHE_MAX_FILE = + "mapreduce.tasktracker.datacache.max.file.size.mb"; + public static final String TT_DATA_CACHE_MAX_RECORD = + "mapreduce.tasktracker.datacache.max.record.size.kb"; } Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapDataCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapDataCache.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapDataCache.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SecureIOUtils; +import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; +import org.apache.hadoop.security.UserGroupInformation; + +public class TestMapDataCache extends TestCase { + + private static final Log LOG = LogFactory.getLog(TestMapDataCache.class); + + private JobConf conf; + private FileSystem fs; + private Path p; + + @Override + public void setUp() throws IOException { + conf = new JobConf(); + fs = FileSystem.getLocal(conf).getRaw(); + p = new Path(System.getProperty("test.build.data", "/tmp"), + "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + public void testReadDataFileToCache() throws Exception { + fs.delete(p, true); + conf.setInt(TTConfig.TT_INDEX_CACHE, 10); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 24; + IndexCache cache = new IndexCache(conf); + MapDataCache dataCache = new MapDataCache(conf); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + String mapId = "attempt_1590659776734_1485580_m_064744_0"; + + // fill cache + String path = p.toUri().getPath(); + Path indexPath = new Path(path, "file.out.index"); + Path dataPath = new Path(path, "file.out"); + writeIndexFile(fs, indexPath, bytesPerFile, partsPerMap); + writeDataFile(fs, indexPath, dataPath, partsPerMap, cache, mapId, user); + + try { + RandomAccessFile spill = SecureIOUtils.openForRandomRead( + new File(dataPath.toString()), "r", user, null); + + for (int i = 0; i < partsPerMap; i++) { + checkCacheData(spill, i, cache, dataCache, mapId, indexPath, user, dataPath); + } + + spill.close(); + } catch (IOException e) { + if (!(e.getCause() instanceof ChecksumException)) { + throw e; + } + } + } + + public void testDataCacheWithMultipleThread() throws Exception { + fs.delete(p, true); + conf.setInt(TTConfig.TT_DATA_CACHE, 10); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 24; + conf.setInt(TTConfig.TT_DATA_CACHE_MAX_FILE_NUMBER, 1000); + conf.setInt(TTConfig.TT_DATA_CACHE_MAX_FILE, 1); + conf.setInt(TTConfig.TT_DATA_CACHE_MAX_RECORD, 6000); + conf.setInt(TTConfig.TT_DATA_CACHE, 10); + IndexCache cache = new IndexCache(conf); + MapDataCache dataCache = new MapDataCache(conf); + int threadNum = 50; + int batch = partsPerMap / threadNum; + ExecutorService pool = Executors.newFixedThreadPool(threadNum); + ArrayList> futureList = new ArrayList<>(); + + AtomicInteger total = new AtomicInteger(); + List> allTasks = new ArrayList<>(); + for (int num = 0; num < 600; num++) { + // fill cache + String path = p.toUri().getPath(); + Path indexPath = new Path(path, "file-" + num + ".out.index"); + Path dataPath = new Path(path, "file-" + num + ".out"); + writeIndexFile(fs, indexPath, bytesPerFile, partsPerMap); + + String mapId = "attempt_1590659776734_1485580_m_064744_" + num; + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + writeDataFile(fs, indexPath, dataPath, partsPerMap, cache, mapId, user); + + for (int i = 0; i < threadNum; i++) { + final int startReduce = i * batch; + allTasks.add(() -> { + checkAllData(partsPerMap, cache, dataCache, batch, total, + indexPath, dataPath, mapId, user, startReduce); + return 0; + }); + } + } + + Random random = new Random(); + int size = allTasks.size(); + for (int i = 0; i < size; i++) { + int index = random.nextInt(size - i); + Future future = pool.submit(allTasks.remove(index)); + futureList.add(future); + } + + int totalNum = 0; + for (Future future : futureList) { + totalNum = totalNum + future.get(); + } + + assertEquals(0, dataCache.getDataCacheUsage()); + assertEquals(0, totalNum); + pool.shutdown(); + } + + private void checkAllData(int partsPerMap, IndexCache cache, MapDataCache dataCache, int batch, + AtomicInteger total, Path indexPath, Path dataPath, String mapId, String user, + int startReduce) throws IOException { + RandomAccessFile spill = null; + try { + spill = SecureIOUtils.openForRandomRead( + new File(dataPath.toString()), "r", user, null); + for (int j = startReduce; j < startReduce + batch && j < partsPerMap; j++) { + total.incrementAndGet(); + checkCacheData(spill, j, cache, dataCache, mapId, indexPath, user, dataPath); + } + spill.close(); + } finally { + if (null != spill) { + try { + spill.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + /** + * check cache data is correct + */ + private void checkCacheData(RandomAccessFile spill, int reduce, IndexCache cache, + MapDataCache dataCache, String mapId, Path indexPath, + String user, Path dataPath) throws IOException { + IndexRecord indexRecord = cache.getIndexInformation(mapId, reduce, indexPath, user); + byte[] buffer = dataCache.getDataInformation(mapId, reduce, + cache.getIndexInformation(mapId), dataPath, user); + + if (buffer == null) { + LOG.debug(mapId + " : The buffer of Reduce " + reduce + " is null."); + return; + } + + byte[] bytes = new byte[(int) indexRecord.partLength]; + spill.seek(indexRecord.startOffset); + spill.read(bytes); + + for (int i = 0; i < indexRecord.partLength; i++) { + assertEquals(buffer[i], bytes[i]); + } + } + + private static void writeIndexFile(FileSystem fs, Path f, long fill, int parts) + throws IOException { + DataOutputStream dout = null; + try { + FSDataOutputStream out = fs.create(f, false); + CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); + dout = new DataOutputStream(iout); + + int offset = 0; + int bytesPerPart = (int) fill / parts; + Random random = new Random(); + for (int i = 0; i < parts; i++) { + long partLength = random.nextInt(bytesPerPart); + dout.writeLong(offset); + dout.writeLong(partLength); + dout.writeLong(partLength); + offset += partLength; + } + out.writeLong(iout.getChecksum().getValue()); + } finally { + if (dout != null) { + dout.close(); + } + } + } + + private static void writeDataFile(FileSystem fs, Path indexPath, + Path dataPath, int parts, + IndexCache cache, String mapId, String user) + throws IOException { + final File file = new File(dataPath.toString()); + try (RandomAccessFile out = SecureIOUtils.openForRandomRead(file, "rw", + user, null)) { + for (int reduce = 0; reduce < parts; ++reduce) { + IndexRecord indexRecord = cache.getIndexInformation(mapId, reduce, indexPath, user); + out.seek(indexRecord.startOffset); + for (int j = 0; j < indexRecord.partLength; j++) { + out.writeByte((byte) (reduce & 0xFF)); + } + } + } + } +} Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (revision f9619b0b974fc767bbd8c579f7bbaa028dce6fe4) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (revision a88bf85c206ceb71fff6e92bbd209f24369d295a) @@ -850,6 +850,7 @@ class Shuffle extends SimpleChannelUpstreamHandler { private final IndexCache indexCache; + private final MapDataCache dataCache; private final LoadingCache pathCache; @@ -858,6 +859,7 @@ Shuffle(Configuration conf) { this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); this.indexCache = new IndexCache(new JobConf(conf)); + this.dataCache = new MapDataCache(new JobConf(conf)); this.pathCache = CacheBuilder.newBuilder() .expireAfterAccess(conf.getInt(EXPIRE_AFTER_ACCESS_MINUTES, DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES), TimeUnit.MINUTES) @@ -1255,40 +1257,49 @@ final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); - final File spillfile = - new File(mapOutputInfo.mapOutputFileName.toString()); - RandomAccessFile spill; - try { - spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); - } catch (FileNotFoundException e) { - LOG.info(spillfile + " not found"); - return null; - } - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) == null) { - final FadvisedFileRegion partition = new FadvisedFileRegion(spill, - info.startOffset, info.partLength, manageOsCache, readaheadLength, - readaheadPool, spillfile.getAbsolutePath(), - shuffleBufferSize, shuffleTransferToAllowed); - writeFuture = ch.write(partition); - writeFuture.addListener(new ChannelFutureListener() { + ChannelFuture writeFuture; + byte[] dataBuffer = dataCache.getDataInformation(mapId, reduce, + indexCache.getIndexInformation(mapId), + mapOutputInfo.mapOutputFileName, user); + if (null != dataBuffer && dataBuffer.length == info.partLength) { + // hit data cache + writeFuture = ch.write(wrappedBuffer(dataBuffer)); + } else { + final File spillfile = + new File(mapOutputInfo.mapOutputFileName.toString()); + RandomAccessFile spill; + try { + spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); + } catch (FileNotFoundException e) { + LOG.info(spillfile + " not found"); + return null; + } + + if (ch.getPipeline().get(SslHandler.class) == null) { + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, + info.startOffset, info.partLength, manageOsCache, readaheadLength, + readaheadPool, spillfile.getAbsolutePath(), + shuffleBufferSize, shuffleTransferToAllowed); + writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { // TODO error handling; distinguish IO/connection failures, // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - partition.transferSuccessful(); - } - partition.releaseExternalResources(); - } - }); - } else { - // HTTPS cannot be done with zero copy. - final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - info.startOffset, info.partLength, sslFileBufferSize, - manageOsCache, readaheadLength, readaheadPool, - spillfile.getAbsolutePath()); - writeFuture = ch.write(chunk); + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + partition.transferSuccessful(); + } + partition.releaseExternalResources(); + } + }); + } else { + // HTTPS cannot be done with zero copy. + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + info.startOffset, info.partLength, sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + spillfile.getAbsolutePath()); + writeFuture = ch.write(chunk); + } } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic