Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-6238

MR2 can't run local jobs with -libjars command options which is a regression from MR1

    XMLWordPrintableJSON

Details

    • Reviewed

    Description

      MR2 can't run local jobs with -libjars command options which is a regression from MR1.
      When run MR2 job with -jt local and -libjars, the job fails with java.io.FileNotFoundException: File does not exist: hdfs://XXXXXXXXXXXXXXX.jar.
      But the same command is working in MR1.
      I find the problem is
      1.
      because when MR2 run local job using LocalJobRunner
      from JobSubmitter, the JobSubmitter#jtFs is local filesystem,
      So copyRemoteFiles will return from the middle of the function
      because source and destination file system are same.

          if (compareFs(remoteFs, jtFs)) {
            return originalPath;
          }
      

      The following code at JobSubmitter.java
      try to add the destination file to DistributedCache which introduce a bug for local job.

              Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
              DistributedCache.addFileToClassPath(
                  new Path(newPath.toUri().getPath()), conf);
      

      Because new Path(newPath.toUri().getPath()) will lose the filesystem information from newPath, the file added to DistributedCache will use the default Uri filesystem hdfs based on the following code. This causes the
      FileNotFoundException when we access the file later at
      determineTimestampsAndCacheVisibilities

        public static void addFileToClassPath(Path file, Configuration conf)
          throws IOException {
      	  addFileToClassPath(file, conf, file.getFileSystem(conf));
        }
        public static void addFileToClassPath
                 (Path file, Configuration conf, FileSystem fs)
              throws IOException {
          String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
          conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
                   : classpath + "," + file.toString());
          URI uri = fs.makeQualified(file).toUri();
          addCacheFile(uri, conf);
        }
      

      Compare to the following MR1 code:

              Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
              DistributedCache.addFileToClassPath(
                new Path(newPath.toUri().getPath()), job, fs);
      

      You will see why MR1 doesn't have this issue.
      because it passes the local filesystem into DistributedCache#addFileToClassPath instead of using the default Uri filesystem hdfs.
      2.
      Another incompatible change in MR2 is in LocalDistributedCacheManager#setup

          // Find which resources are to be put on the local classpath
          Map<String, Path> classpaths = new HashMap<String, Path>();
          Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
          if (archiveClassPaths != null) {
            for (Path p : archiveClassPaths) {
              FileSystem remoteFS = p.getFileSystem(conf);
              p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
                  remoteFS.getWorkingDirectory()));
              classpaths.put(p.toUri().getPath().toString(), p);
            }
          }
          Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
          if (fileClassPaths != null) {
            for (Path p : fileClassPaths) {
              FileSystem remoteFS = p.getFileSystem(conf);
              p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
                  remoteFS.getWorkingDirectory()));
              classpaths.put(p.toUri().getPath().toString(), p);
            }
          }
      

      Similar code from MR1 is at TaskDistributedCacheManager#makeCacheFiles

              Map<String, Path> classPaths = new HashMap<String, Path>();
              if (paths != null) {
                for (Path p : paths) {
                  classPaths.put(p.toUri().getPath().toString(), p);
                  }
              }
      

      I think we don't need call remoteFS.resolvePath to get the class path and
      We can use the class path from DistributedCache.getFileClassPaths directly.
      Also p.toUri().getPath().toString() will remove the filesystem information(scheme) and only keySet of classpaths is used(ValueSet of classpaths is not used).
      It is better to do the same in MR2 to maintain backward compatible with MR1.

      Attachments

        1. MAPREDUCE-6238.000.patch
          6 kB
          Zhihai Xu

        Issue Links

          Activity

            People

              zxu Zhihai Xu
              zxu Zhihai Xu
              Votes:
              0 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: