Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25453

LocalFileSystem#listStatus throws FileNotFoundException when a file is deleted concurrently in the directory

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.2
    • None
    • API / Core
    • None

    Description

      Add the following test case to LocalFileSystemTest to reproduce this issue.

      @Test
      public void myTest() throws Exception {
          temporaryFolder.create();
          Runnable r =
                  () -> {
                      try {
                          Path path = new Path(temporaryFolder.getRoot() + "/" + UUID.randomUUID());
                          FileSystem fs = path.getFileSystem();
                          try (FSDataOutputStream out =
                                  fs.create(path, FileSystem.WriteMode.NO_OVERWRITE)) {
                              OutputStreamWriter writer =
                                      new OutputStreamWriter(out, StandardCharsets.UTF_8);
                              writer.write("test");
                              writer.flush();
                          }
                          Thread.sleep(ThreadLocalRandom.current().nextInt(100));
                          fs.listStatus(new Path(temporaryFolder.getRoot().toString()));
                          Thread.sleep(ThreadLocalRandom.current().nextInt(100));
                          fs.delete(path, false);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  };
          List<Thread> threads = new ArrayList<>();
          for (int i = 0; i < 100; i++) {
              Thread thread = new Thread(r);
              thread.start();
              threads.add(thread);
          }
          for (Thread thread : threads) {
              thread.join();
          }
      }
      

      Exception stack

      java.io.FileNotFoundException: File /var/folders/y9/hqm_j18s105g5n8_xq00rd7c0000gp/T/junit8680341925762938456/f3b7f8a3-7092-464a-af7e-e7f8465c041d does not exist or the user running Flink ('tsreaper') has insufficient permissions to access it.
      	at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
      	at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:167)
      	at org.apache.flink.core.fs.local.LocalFileSystemTest.lambda$myTest$0(LocalFileSystemTest.java:90)
      	at java.lang.Thread.run(Thread.java:748)
      

      This is because listStatus is not atomic. LocalFileSystem will first get all file names in the directory and query for the status of each file. If that file is removed after the start of listStatus but before the query FileNotFoundException will be thrown.

      Hadoop's RawLocalFileSystem handles this by ignoring FileNotFoundException.

      for (int i = 0; i < names.length; i++) {
        try {
          // Assemble the path using the Path 3 arg constructor to make sure
          // paths with colon are properly resolved on Linux
          results[j] = getFileStatus(new Path(f, new Path(null, null, names[i])));
          j++;
        } catch (FileNotFoundException e) {
          // ignore the files not found since the dir list may have have changed
          // since the names[] list was generated.
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: