Pig
  1. Pig
  2. PIG-1752

UDFs should be able to indicate files to load in the distributed cache

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: impl
    • Labels:
      None
    • Release Note:
      Hide
      We add a new method to EvalFunc:
      public List<String> getCacheFiles();

      User can override this method to return a list of hdfs files need to shipped to distributed cache. Inside EvalFunc, user can assume these files are already exist in distributed cache.

      For example:
      public class Udfcachetest extends EvalFunc<String> {

          public String exec(Tuple input) throws IOException {
              FileReader fr = new FileReader("./smallfile");
              BufferedReader d = new BufferedReader(fr);
              return d.readLine();
          }

          public List<String> getCacheFiles() {
              List<String> list = new ArrayList<String>(1);
              list.add("/user/pig/tests/data/small#smallfile");
              return list;
          }
      }

      a = load '1.txt';
      b = foreach a generate Udfcachetest(*);
      dump b;
      Show
      We add a new method to EvalFunc: public List<String> getCacheFiles(); User can override this method to return a list of hdfs files need to shipped to distributed cache. Inside EvalFunc, user can assume these files are already exist in distributed cache. For example: public class Udfcachetest extends EvalFunc<String> {     public String exec(Tuple input) throws IOException {         FileReader fr = new FileReader("./smallfile");         BufferedReader d = new BufferedReader(fr);         return d.readLine();     }     public List<String> getCacheFiles() {         List<String> list = new ArrayList<String>(1);         list.add("/user/pig/tests/data/small#smallfile");         return list;     } } a = load '1.txt'; b = foreach a generate Udfcachetest(*); dump b;

      Description

      Currently there is no way for a UDF to load a file into the distributed cache.

        Issue Links

          Activity

          Hide
          Daniel Dai added a comment -

          Do we want to add it to LoadFunc/StoreFunc as well?

          Show
          Daniel Dai added a comment - Do we want to add it to LoadFunc/StoreFunc as well?
          Hide
          Alan Gates added a comment -

          This patch changes the EvalFunc interface to allow UDFs to declare a list of files they want to put in the distributed cache. It adds a new method

              /**
               * Allow a UDF to specify a list of files it would like placed in the distributed
               * cache.  These files will be put in the cache for every job the UDF is used in.
               * The default implementation returns null.
               * @return A list of files
               */
              public List<String> getCacheFiles() {
                  return null;
              }
              
          

          This change is backward compatible since EvalFunc is an abstract class and the default implementation returns null.

          Any files returned by getCacheFiles are captured and placed in the physical plan during logical->physical translation. The JobControlCompiler then visits each UDF and adds the files returned to the list of files to load into the distributed cache for this job.

          No special handling is provided for the files. Users have to assure they are already on HDFS. The filename should be of the form:

          hdfs://namenode/path#symlink

          where symlink is the name that the file will be linked into the tasks local directory under. The UDF can then access the file in the backend by opening that symlink as a local file.

          Show
          Alan Gates added a comment - This patch changes the EvalFunc interface to allow UDFs to declare a list of files they want to put in the distributed cache. It adds a new method /** * Allow a UDF to specify a list of files it would like placed in the distributed * cache. These files will be put in the cache for every job the UDF is used in. * The default implementation returns null . * @ return A list of files */ public List< String > getCacheFiles() { return null ; } This change is backward compatible since EvalFunc is an abstract class and the default implementation returns null. Any files returned by getCacheFiles are captured and placed in the physical plan during logical->physical translation. The JobControlCompiler then visits each UDF and adds the files returned to the list of files to load into the distributed cache for this job. No special handling is provided for the files. Users have to assure they are already on HDFS. The filename should be of the form: hdfs://namenode/path#symlink where symlink is the name that the file will be linked into the tasks local directory under. The UDF can then access the file in the backend by opening that symlink as a local file.
          Hide
          Alan Gates added a comment -

          One other note. I didn't include any unit tests with this patch. I don't know how to test it in the unit tests since the distributed cache isn't used in local mode. I've tested it on a cluster. Any thoughts on how to include tests for this in the unit tests are welcome.

          Show
          Alan Gates added a comment - One other note. I didn't include any unit tests with this patch. I don't know how to test it in the unit tests since the distributed cache isn't used in local mode. I've tested it on a cluster. Any thoughts on how to include tests for this in the unit tests are welcome.
          Hide
          Daniel Dai added a comment -

          +1. I don't think there is an easy way to put unit test.

          Also we can open another Jira for LoadFunc.

          Show
          Daniel Dai added a comment - +1. I don't think there is an easy way to put unit test. Also we can open another Jira for LoadFunc.
          Hide
          Alan Gates added a comment -

          This may make sense to add to the load func as well, but I agree we can do that separately if we decide it would be useful.

          Show
          Alan Gates added a comment - This may make sense to add to the load func as well, but I agree we can do that separately if we decide it would be useful.
          Hide
          Alan Gates added a comment -

          [exec] -1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] -1 tests included. The patch doesn't appear to include any new or modified tests.
          [exec] Please justify why no tests are needed for this patch.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] -1 release audit. The applied patch generated 450 release audit warnings (more than the trunk's current 449 warnings).
          [exec]
          [exec]

          The no tests issue I discussed above.

          The extra release audit is due to a .orig file left over from the merge

          All unit tests passed as well.

          Show
          Alan Gates added a comment - [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 release audit. The applied patch generated 450 release audit warnings (more than the trunk's current 449 warnings). [exec] [exec] The no tests issue I discussed above. The extra release audit is due to a .orig file left over from the merge All unit tests passed as well.
          Hide
          Alan Gates added a comment -

          Patch checked in.

          Show
          Alan Gates added a comment - Patch checked in.

            People

            • Assignee:
              Alan Gates
              Reporter:
              Alan Gates
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development