Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18811

if a disk is damaged, taskmanager should choose another disk for temp dir , rather than throw an IOException, which causes flink job restart over and over again

    XMLWordPrintableJSON

Details

    Description

      I met this Exception when a hard disk was damaged:

      I checked the code and found that flink will create a temp file  when Record length > 5 MB:

       

      // SpillingAdaptiveSpanningRecordDeserializer.java
      if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
         // create a spilling channel and put the data there
         this.spillingChannel = createSpillingChannel();
      
         ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
         FileUtils.writeCompletely(this.spillingChannel, toWrite);
      }
      

      The tempDir is random picked from all `tempDirs`. Well on yarn mode, one `tempDir`  usually represents one hard disk.

       

      In may opinion, if a hard disk is damaged, taskmanager should pick another disk(tmpDir) for Spilling Channel, rather than throw an IOException, which causes flink job restart over and over again.

      If we could just change “SpillingAdaptiveSpanningRecordDeserializer" like this:

      // SpillingAdaptiveSpanningRecordDeserializer.java
      private FileChannel createSpillingChannel() throws IOException {
         if (spillFile != null) {
            throw new IllegalStateException("Spilling file already exists.");
         }
      
         // try to find a unique file name for the spilling channel
         int maxAttempts = 10;
         String[] tempDirs = this.tempDirs;
         for (int attempt = 0; attempt < maxAttempts; attempt++) {
            int dirIndex = rnd.nextInt(tempDirs.length);
            String directory = tempDirs[dirIndex];
            spillFile = new File(directory, randomString(rnd) + ".inputchannel");
            try {
               if (spillFile.createNewFile()) {
                  return new RandomAccessFile(spillFile, "rw").getChannel();
               }
            } catch (IOException e) {
               // if there is no tempDir left to try
               if(tempDirs.length <= 1) {
                  throw e;
               }
               LOG.warn("Caught an IOException when creating spill file: " + directory + ". Attempt " + attempt, e);
               tempDirs = (String[])ArrayUtils.remove(tempDirs,dirIndex);
            }
         }
      

      Attachments

        1. flink_disk_error.png
          135 kB
          Kai Chen

        Activity

          People

            yuchuanchen Kai Chen
            yuchuanchen Kai Chen
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: