Pig
  1. Pig
  2. PIG-652

Need to give user control of OutputFormat

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.2.0
    • Fix Version/s: 0.3.0
    • Component/s: impl
    • Labels:
      None
    • Hadoop Flags:
      Incompatible change

      Description

      Pig currently allows users some control over InputFormat via the Slicer and Slice interfaces. It does not allow any control over OutputFormat and RecordWriter interfaces. It just allows the user to implement a storage function that controls how the data is serialized. For hadoop tables, we will need to allow custom OutputFormats that prepare output information and objects needed by a Table store function.

      1. PIG-652-v5.patch
        29 kB
        Gunther Hagleitner
      2. PIG-652-v4.patch
        22 kB
        Pradeep Kamath
      3. PIG-652-v3.patch
        22 kB
        Pradeep Kamath
      4. PIG-652-v2.patch
        14 kB
        Pradeep Kamath
      5. PIG-652.patch
        14 kB
        Pradeep Kamath

        Activity

        Hide
        Pradeep Kamath added a comment -

        This patch was committed - marking as resolved.

        Show
        Pradeep Kamath added a comment - This patch was committed - marking as resolved.
        Hide
        Gunther Hagleitner added a comment -

        v5 patch includes the stuff for multiquery.

        Show
        Gunther Hagleitner added a comment - v5 patch includes the stuff for multiquery.
        Hide
        Hong Tang added a comment -

        Can we get this patch committed?

        Show
        Hong Tang added a comment - Can we get this patch committed?
        Hide
        Pradeep Kamath added a comment - - edited

        Attaching new patch - the only difference is:
        (old code is the first input in the diff below)

        < --- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java  (revision 747112)
        ---
        > --- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java  (revision 748740)
        146c146
        < +            if(sPrepClass != null && sPrepClass.isInstance(OutputFormat.class)) {
        ---
        > +            if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
        
        

        The code achives checking whether the class supplied by the Storefunc is of type OutputFormat

        Show
        Pradeep Kamath added a comment - - edited Attaching new patch - the only difference is: (old code is the first input in the diff below) < --- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (revision 747112) --- > --- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (revision 748740) 146c146 < + if (sPrepClass != null && sPrepClass.isInstance(OutputFormat.class)) { --- > + if (sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) { The code achives checking whether the class supplied by the Storefunc is of type OutputFormat
        Hide
        Pradeep Kamath added a comment -

        Attached a new version of the patch. Changes include:
        1) Included MapredUtil.java source file which was missing in the previous patch
        2) Fixed a few issues which were uncovered in some tests
        2) regenerated the patch with the latest svn revision

        Show
        Pradeep Kamath added a comment - Attached a new version of the patch. Changes include: 1) Included MapredUtil.java source file which was missing in the previous patch 2) Fixed a few issues which were uncovered in some tests 2) regenerated the patch with the latest svn revision
        Hide
        Pradeep Kamath added a comment -

        The patch is currently for code review only - it should not be committed yet.

        Show
        Pradeep Kamath added a comment - The patch is currently for code review only - it should not be committed yet.
        Hide
        Pradeep Kamath added a comment -

        Attached new version which addresses the comment regarding having a serialVersionUID in StoreConfig since it its Serializable. Also removed a redundant import from StoreFunc.java.

        Moving the string constants to a separate PropertyKeys.java file would be good but to be useful will need all existing free standing constants to be moved there - this would be good in a separate jira as suggested.

        Having schemas in all PhysicalOperators might be good but would need all the visitors (optimizers etc) which run after LogToPhyTranslationVisitor and which introduce new PhysicalOperators to also generate schema for them. Likewise these visitor would have to handle schema changes on operators they modify - this might be better pursued in a different jira is found to be worthwhile.

        Show
        Pradeep Kamath added a comment - Attached new version which addresses the comment regarding having a serialVersionUID in StoreConfig since it its Serializable. Also removed a redundant import from StoreFunc.java. Moving the string constants to a separate PropertyKeys.java file would be good but to be useful will need all existing free standing constants to be moved there - this would be good in a separate jira as suggested. Having schemas in all PhysicalOperators might be good but would need all the visitors (optimizers etc) which run after LogToPhyTranslationVisitor and which introduce new PhysicalOperators to also generate schema for them. Likewise these visitor would have to handle schema changes on operators they modify - this might be better pursued in a different jira is found to be worthwhile.
        Hide
        Santhosh Srinivasan added a comment -

        Review comments:

        1. In JobControlCompiler, the use of a static final string is a good idea to remove the use of string constants. We should probably make this change across the board as part of a new JIRA

        2. Will schemas be useful for other operators and not just POStore?

        3. StoreConfig implements Serializable, it should also have a static final long serialVersionUID

        The rest of the code is good. I have not run any of the tests, this was a pure code review.

        Show
        Santhosh Srinivasan added a comment - Review comments: 1. In JobControlCompiler, the use of a static final string is a good idea to remove the use of string constants. We should probably make this change across the board as part of a new JIRA 2. Will schemas be useful for other operators and not just POStore? 3. StoreConfig implements Serializable, it should also have a static final long serialVersionUID The rest of the code is good. I have not run any of the tests, this was a pure code review.
        Hide
        Pradeep Kamath added a comment -

        Submitting a patch with a few changes to the way this will work. Very soon we will have the ability to store multiple outputs in the map or reduce phase of a job (https://issues.apache.org/jira/browse/PIG-627). In that scenario the OutputFormat will still need to be able to get a handle of the corresponding StoreFunc, location and schema to use for the particular output that it is trying to write. To Enable this a Utility class - MapRedUtil is being introduced which has static methods which will take a JobConf and return these pieces of information. When PIG-627 is implemented, these utility classes will hide the inner Pig implementation to map the multiple stores to the corresponding StoreFunc, location and schema.

        The new method in StoreFunc proposed at the beginning of this issue will still be used to ask the StoreFunc if it will provide an OutputFormat implementation.

        Show
        Pradeep Kamath added a comment - Submitting a patch with a few changes to the way this will work. Very soon we will have the ability to store multiple outputs in the map or reduce phase of a job ( https://issues.apache.org/jira/browse/PIG-627 ). In that scenario the OutputFormat will still need to be able to get a handle of the corresponding StoreFunc, location and schema to use for the particular output that it is trying to write. To Enable this a Utility class - MapRedUtil is being introduced which has static methods which will take a JobConf and return these pieces of information. When PIG-627 is implemented, these utility classes will hide the inner Pig implementation to map the multiple stores to the corresponding StoreFunc, location and schema. The new method in StoreFunc proposed at the beginning of this issue will still be used to ask the StoreFunc if it will provide an OutputFormat implementation.
        Hide
        Hong Tang added a comment -

        One more thing that is still not clear to me. StoreFunc does not impelement any serialization interface, and it depends on an all-string constructor to properly construct the object. How do my customized TableStoreFunc instance convey this information to PIG?

        Show
        Hong Tang added a comment - One more thing that is still not clear to me. StoreFunc does not impelement any serialization interface, and it depends on an all-string constructor to properly construct the object. How do my customized TableStoreFunc instance convey this information to PIG?
        Hide
        Hong Tang added a comment -

        You probably want to provide utility method for getting back the StoreFunc from a JobConf, instead of forcing people into copy/paste internal pig code...

        Show
        Hong Tang added a comment - You probably want to provide utility method for getting back the StoreFunc from a JobConf, instead of forcing people into copy/paste internal pig code...
        Hide
        Alan Gates added a comment -

        Take a look at PigOutputFormat.java line 58 to see how that's handled.

        Show
        Alan Gates added a comment - Take a look at PigOutputFormat.java line 58 to see how that's handled.
        Hide
        Hong Tang added a comment -

        So StoreFunc can be used to keep the states. One more missing link: how can my OutputFormat class get hold of my StoreFunc instance?

        Show
        Hong Tang added a comment - So StoreFunc can be used to keep the states. One more missing link: how can my OutputFormat class get hold of my StoreFunc instance?
        Hide
        Alan Gates added a comment -

        Sorry, the missing piece is that pig serializes the StoreFunc on the front end and deserializes it on the back end, so it can store things in member variables and then have them on the backend. But, for a more elegant proposal see, PIG-602. We could add this as part of these changes so that your store function could use that as a transport mechanism.

        Show
        Alan Gates added a comment - Sorry, the missing piece is that pig serializes the StoreFunc on the front end and deserializes it on the back end, so it can store things in member variables and then have them on the backend. But, for a more elegant proposal see, PIG-602 . We could add this as part of these changes so that your store function could use that as a transport mechanism.
        Hide
        Hong Tang added a comment -

        From OutputFormat.java:

          /** 
           * Get the {@link RecordWriter} for the given job.
           *
           * @param ignored
           * @param job configuration for the job whose output is being written.
           * @param name the unique name for this part of the output.
           * @param progress mechanism for reporting progress while writing to file.
           * @return a {@link RecordWriter} to write the output for the job.
           * @throws IOException
           */
          RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                             String name, Progressable progress)
          throws IOException;
        
        

        It does not say that the name is the full path of the file.

        Show
        Hong Tang added a comment - From OutputFormat.java: /** * Get the {@link RecordWriter} for the given job. * * @param ignored * @param job configuration for the job whose output is being written. * @param name the unique name for this part of the output. * @param progress mechanism for reporting progress while writing to file. * @ return a {@link RecordWriter} to write the output for the job. * @ throws IOException */ RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException; It does not say that the name is the full path of the file.
        Hide
        Hong Tang added a comment -

        Again, I might miss something obvious to you. My understanding is that any OutputFormat classes would be constructed by the default constructor, and all its states would have to be passed from JobClient side to actual task-side through JobConf. Here is how people create a basic-table directly through BasicTableOutputFormat:

        At JobClient side:

        jobConf.setOutputFormat(BasicTableOutputFormat.class);
        Path outPath = new Path("path/to/the/BasicTable");
        BasicTableOutputFormat.setOutputPath(jobConf, outPath);
        String schema = new String("Name, Age, Salary, BonusPct");
        BasicTableOutputFormat.setSchema(jobConf, schema);
        
        static class MyReduceClass implements Reducer<K, V, BytesWritable, Tuple> {
          Tuple outRow;
          int idxName, idxAge, idxSalary, idxBonusPct;
         
          public void configure(JobConf job) {
            Schema outSchema = BasicTableOutputFormat.getSchema(job);
            outRow = TypesUtils.createTuple(outSchema);
            idxName = outSchema.getColumnIndex("Name");
            idxAge = outSchema.getColumnIndex("Age");
            idxSalary = outSchema.getColumnIndex("Salary");
            idxBonusPct = outSchema.getColumnIndex("BonusPct");
          }
          public void reduce(K key, Iterator<V> values,         
              OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
              throws IOException {
            String name;
            int age, salary;
            double bonusPct;
        
                // ... Determine individual field values of the row to be inserted ...
        
            try {
              outTuple.set(idxName, name);
              outTuple.set(idxAge, new Integer(age));
              outTuple.set(idxSalary, new Integer(salary));
              outTuple.set(idxBonusPct, new Double(bonusPct));
              output.collect(new BytesWritable(name.getBytes()), outTuple);
            } catch (ExecException e) {
              // should never happen
            }
          }
         
          public void close() throws IOException { /* no-op */  } 
        }
        
        Show
        Hong Tang added a comment - Again, I might miss something obvious to you. My understanding is that any OutputFormat classes would be constructed by the default constructor, and all its states would have to be passed from JobClient side to actual task-side through JobConf. Here is how people create a basic-table directly through BasicTableOutputFormat: At JobClient side: jobConf.setOutputFormat(BasicTableOutputFormat.class); Path outPath = new Path( "path/to/the/BasicTable" ); BasicTableOutputFormat.setOutputPath(jobConf, outPath); String schema = new String ( "Name, Age, Salary, BonusPct" ); BasicTableOutputFormat.setSchema(jobConf, schema); static class MyReduceClass implements Reducer<K, V, BytesWritable, Tuple> { Tuple outRow; int idxName, idxAge, idxSalary, idxBonusPct; public void configure(JobConf job) { Schema outSchema = BasicTableOutputFormat.getSchema(job); outRow = TypesUtils.createTuple(outSchema); idxName = outSchema.getColumnIndex( "Name" ); idxAge = outSchema.getColumnIndex( "Age" ); idxSalary = outSchema.getColumnIndex( "Salary" ); idxBonusPct = outSchema.getColumnIndex( "BonusPct" ); } public void reduce(K key, Iterator<V> values, OutputCollector<BytesWritable, Tuple> output, Reporter reporter) throws IOException { String name; int age, salary; double bonusPct; // ... Determine individual field values of the row to be inserted ... try { outTuple.set(idxName, name); outTuple.set(idxAge, new Integer (age)); outTuple.set(idxSalary, new Integer (salary)); outTuple.set(idxBonusPct, new Double (bonusPct)); output.collect( new BytesWritable(name.getBytes()), outTuple); } catch (ExecException e) { // should never happen } } public void close() throws IOException { /* no-op */ } }
        Hide
        Alan Gates added a comment -

        As far as getting the schema in the output format, that will be the job of your StoreFunc to store it somewhere that your OutputFormat can retrieve it.

        The path passed to the OutputFormat is the full path of the file to be written, not just part-001.

        Show
        Alan Gates added a comment - As far as getting the schema in the output format, that will be the job of your StoreFunc to store it somewhere that your OutputFormat can retrieve it. The path passed to the OutputFormat is the full path of the file to be written, not just part-001.
        Hide
        Hong Tang added a comment -

        I might miss something. How can the outputformat class retrieve the schema information? The output format is constructed with its default constructor, and then its getRecordWriter is called with name like part-001, part-002, but not the path to the basic table.

        Show
        Hong Tang added a comment - I might miss something. How can the outputformat class retrieve the schema information? The output format is constructed with its default constructor, and then its getRecordWriter is called with name like part-001, part-002, but not the path to the basic table.
        Hide
        Alan Gates added a comment -

        Sorry, forgot to include the schema part. A second function should be added to the StoreFunc interface:

            /**
             * Specify a schema to be used in storing the data.  This can be used by
             * store functions that store the data in a self describing format.  The
             * store function is free to ignore this if it cannot use it.
             * @param schema of the output data
             */
            public void setStorageSchema(Schema s);
        

        This function would be called during query planning. The StoreFunc can then take the responsibility of storing away the schema so that it (or it's associated OutputFormat) can access it on the backend. This schema will also include the sortedness of the data.

        As for making the JobConf and path available those are passed to OutputFormat.getRecordWriter, so those implementing their own OutputFormats will have access to them. They can then pass them on to their store functions as they wish.

        For compression, pig right now has no way to communicate compression types other than file endings (.bz is the only one we support at the moment). This is a kludge, but I don't want to propose a whole way to coherently communicate compression in pig at the moment. So I vote that we stay with this for the time being.

        Show
        Alan Gates added a comment - Sorry, forgot to include the schema part. A second function should be added to the StoreFunc interface: /** * Specify a schema to be used in storing the data. This can be used by * store functions that store the data in a self describing format. The * store function is free to ignore this if it cannot use it. * @param schema of the output data */ public void setStorageSchema(Schema s); This function would be called during query planning. The StoreFunc can then take the responsibility of storing away the schema so that it (or it's associated OutputFormat) can access it on the backend. This schema will also include the sortedness of the data. As for making the JobConf and path available those are passed to OutputFormat.getRecordWriter, so those implementing their own OutputFormats will have access to them. They can then pass them on to their store functions as they wish. For compression, pig right now has no way to communicate compression types other than file endings (.bz is the only one we support at the moment). This is a kludge, but I don't want to propose a whole way to coherently communicate compression in pig at the moment. So I vote that we stay with this for the time being.
        Hide
        Alan Gates added a comment -

        In response to Ben's question in comment https://issues.apache.org/jira/browse/PIG-652?focusedCommentId=12671009#action_12671009 of the motivating scenario, the issue is that right now we pass an already opened output stream to the store function. This, and the fact that a fair amount of setup is done in the PigRecordWriter forces all stores to be done to an HDFS text file. If a user wants to store to a different type of HDFS file (like Table) or to a non-HDFS store (such as a database, hbase, a socket, whatever) there's no option for that. We don't want to export all of the setup to the StoreFunc. The RecordWriter is the right place to do that setup.

        Show
        Alan Gates added a comment - In response to Ben's question in comment https://issues.apache.org/jira/browse/PIG-652?focusedCommentId=12671009#action_12671009 of the motivating scenario, the issue is that right now we pass an already opened output stream to the store function. This, and the fact that a fair amount of setup is done in the PigRecordWriter forces all stores to be done to an HDFS text file. If a user wants to store to a different type of HDFS file (like Table) or to a non-HDFS store (such as a database, hbase, a socket, whatever) there's no option for that. We don't want to export all of the setup to the StoreFunc. The RecordWriter is the right place to do that setup.
        Hide
        Hong Tang added a comment -

        How do I get the schema information from Pig? I thought you would put the schema in the JobConf and pass it to the customized OutputFormat class to create RecordWriter.

        Show
        Hong Tang added a comment - How do I get the schema information from Pig? I thought you would put the schema in the JobConf and pass it to the customized OutputFormat class to create RecordWriter.
        Hide
        Benjamin Reed added a comment -

        can you explain the motivating scenario in more detail? Is it just to avoid creating the outputstream?

        @Hong, you aren't going to be getting the schema from the hadoop jobconf, you'll be getting that from pig. since a pig job may involve multiple hadoop jobs. you can't count on passing stuff through hadoop configuration.

        Show
        Benjamin Reed added a comment - can you explain the motivating scenario in more detail? Is it just to avoid creating the outputstream? @Hong, you aren't going to be getting the schema from the hadoop jobconf, you'll be getting that from pig. since a pig job may involve multiple hadoop jobs. you can't count on passing stuff through hadoop configuration.
        Hide
        Hong Tang added a comment -

        Since this API is supposed to provide backend specific output classes, shouldn't the API take a parameter describing the backend?

        For MR backend, the returned class would be implementing OutputFormat<Text, Tuple> ? Also, need to make it public the keys in the JobConf object describing path, schema, compression, etc.

        Show
        Hong Tang added a comment - Since this API is supposed to provide backend specific output classes, shouldn't the API take a parameter describing the backend? For MR backend, the returned class would be implementing OutputFormat<Text, Tuple> ? Also, need to make it public the keys in the JobConf object describing path, schema, compression, etc.
        Hide
        Alan Gates added a comment -

        I propose that we add a method to the StoreFunc interface:

            /**
             * Specify a backend specific class to use to prepare for
             * storing output.  In the Hadoop case, this can return an
             * OutputFormat that will be used instead of PigOutputFormat.  The 
             * framework will call this function and if a Class is returned
             * that implements OutputFormat it will be used.
             * @return Backend specific class used to prepare for storing output.
             * @throws IOException if the class does not implement the expected
             * interface(s).
             */
            public Class getStorePreparationClass() throws IOException;
        

        This way we are not forced to write a whole pig copy of OutputFormat and RecordWriter interfaces (the way Slicer and Slice copy InputFormat, InputSplit, and RecordReader) while still avoiding importing hadoop classes into our interface. It also avoids forcing the StoreFunc to also be RecordWriter (the way LoadFunc has to implement Slicer).

        The downside of this is that we do not allow Pig Latin to change to allow a construct like:

        store A using MyStoreFunc() using format MyOutputFormat()
        

        There would be an advantage of to this. For example, if one wanted to serialize tuples over a socket, you might still want to use PigStorage but create a SocketOutputFormat function. In the currently proposed interface you could still accomplish this by writing a StoreFunc that subclasses PigStorage and implements the getStorePreparationClass(), but this is less elegant.

        As far as I know no one is currently asking for the ability to specify OutputFormat separate from StoreFunc, and doing so would necessitate creating pig copies of OutputFormat and RecordWriter. So rather than create a lot of extra interfaces for functionality no one is requesting I propose this simpler solution. If, in the future we choose to allow the ability to separate the two, we would still want a StoreFunc to be able to specify its OutputFormat, so the proposed functionality would not be deprecated.

        Show
        Alan Gates added a comment - I propose that we add a method to the StoreFunc interface: /** * Specify a backend specific class to use to prepare for * storing output. In the Hadoop case , this can return an * OutputFormat that will be used instead of PigOutputFormat. The * framework will call this function and if a Class is returned * that implements OutputFormat it will be used. * @ return Backend specific class used to prepare for storing output. * @ throws IOException if the class does not implement the expected * interface (s). */ public Class getStorePreparationClass() throws IOException; This way we are not forced to write a whole pig copy of OutputFormat and RecordWriter interfaces (the way Slicer and Slice copy InputFormat, InputSplit, and RecordReader) while still avoiding importing hadoop classes into our interface. It also avoids forcing the StoreFunc to also be RecordWriter (the way LoadFunc has to implement Slicer). The downside of this is that we do not allow Pig Latin to change to allow a construct like: store A using MyStoreFunc() using format MyOutputFormat() There would be an advantage of to this. For example, if one wanted to serialize tuples over a socket, you might still want to use PigStorage but create a SocketOutputFormat function. In the currently proposed interface you could still accomplish this by writing a StoreFunc that subclasses PigStorage and implements the getStorePreparationClass(), but this is less elegant. As far as I know no one is currently asking for the ability to specify OutputFormat separate from StoreFunc, and doing so would necessitate creating pig copies of OutputFormat and RecordWriter. So rather than create a lot of extra interfaces for functionality no one is requesting I propose this simpler solution. If, in the future we choose to allow the ability to separate the two, we would still want a StoreFunc to be able to specify its OutputFormat, so the proposed functionality would not be deprecated.

          People

          • Assignee:
            Pradeep Kamath
            Reporter:
            Alan Gates
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development