Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
flink-1.10
Description
I met this Exception when a hard disk was damaged:
I checked the code and found that flink will create a temp file when Record length > 5 MB:
// SpillingAdaptiveSpanningRecordDeserializer.java if (nextRecordLength > THRESHOLD_FOR_SPILLING) { // create a spilling channel and put the data there this.spillingChannel = createSpillingChannel(); ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); FileUtils.writeCompletely(this.spillingChannel, toWrite); }
The tempDir is random picked from all `tempDirs`. Well on yarn mode, one `tempDir` usually represents one hard disk.
In may opinion, if a hard disk is damaged, taskmanager should pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again.
If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this:
// SpillingAdaptiveSpanningRecordDeserializer.java private FileChannel createSpillingChannel() throws IOException { if (spillFile != null) { throw new IllegalStateException("Spilling file already exists."); } // try to find a unique file name for the spilling channel int maxAttempts = 10; String[] tempDirs = this.tempDirs; for (int attempt = 0; attempt < maxAttempts; attempt++) { int dirIndex = rnd.nextInt(tempDirs.length); String directory = tempDirs[dirIndex]; spillFile = new File(directory, randomString(rnd) + ".inputchannel"); try { if (spillFile.createNewFile()) { return new RandomAccessFile(spillFile, "rw").getChannel(); } } catch (IOException e) { // if there is no tempDir left to try if(tempDirs.length <= 1) { throw e; } LOG.warn("Caught an IOException when creating spill file: " + directory + ". Attempt " + attempt, e); tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex); } }