Pig
  1. Pig
  2. PIG-953

Enable merge join in pig to work with loaders and store functions which can internally index sorted data

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.3.0
    • Fix Version/s: 0.6.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Currently merge join implementation in pig includes construction of an index on sorted data and use of that index to seek into the "right input" to efficiently perform the join operation. Some loaders (notably the zebra loader) internally implement an index on sorted data and can perform this seek efficiently using their index. So the use of the index needs to be abstracted in such a way that when the loader supports indexing, pig uses it (indirectly through the loader) and does not construct an index.

      1. PIG-953.patch
        72 kB
        Pradeep Kamath
      2. PIG-953-2.patch
        72 kB
        Pradeep Kamath
      3. PIG-953-3.patch
        86 kB
        Pradeep Kamath
      4. PIG-953-4.patch
        86 kB
        Pradeep Kamath
      5. PIG-953-5.patch
        95 kB
        Pradeep Kamath
      6. PIG-953-6.patch
        95 kB
        Pradeep Kamath
      7. PIG-953-7.patch
        96 kB
        Pradeep Kamath
      8. PIG-953-8.patch
        98 kB
        Pradeep Kamath
      9. PIG-953-9.patch
        100 kB
        Pradeep Kamath
      10. PIG-953_missing_files.diff
        37 kB
        Dmitriy V. Ryaboy

        Issue Links

          Activity

          Hide
          Dmitriy V. Ryaboy added a comment -

          The attached diff was applied to svn but not posted to the jira. Combine it with patch-9 to get the full patch.

          Show
          Dmitriy V. Ryaboy added a comment - The attached diff was applied to svn but not posted to the jira. Combine it with patch-9 to get the full patch.
          Hide
          Pradeep Kamath added a comment -

          I ran the test-patch process and junit tests on my local machine since the hudson queue was backed up. Here are results - I have explained the reason for the javac warnings and release audit warnings in my previous comment.

          test-patch results
          ====================
          ....
              [exec] -1 overall.
               [exec]
               [exec]     +1 @author.  The patch does not contain any @author tags.
               [exec]
               [exec]     +1 tests included.  The patch appears to include 6 new or modified tests.
               [exec]
               [exec]     +1 javadoc.  The javadoc tool did not generate any warning messages.
               [exec]
               [exec]     -1 javac.  The applied patch generated 200 javac compiler warnings (more than the trunk's current 197 warnings).
               [exec]
               [exec]     +1 findbugs.  The patch does not introduce any new Findbugs warnings.
               [exec]
               [exec]     -1 release audit.  The applied patch generated 298 release audit warnings (more than the trunk's current 291 warnings).
               [exec]
              
          core unit test results
          ======================
          ...
              [junit] Running org.apache.pig.test.TestUnion
              [junit] Tests run: 3, Failures: 0, Errors: 0, Time elapsed: 44.03 sec
          
          test-contrib:
          
          BUILD SUCCESSFUL
          

          Patch committed to trunk

          Show
          Pradeep Kamath added a comment - I ran the test-patch process and junit tests on my local machine since the hudson queue was backed up. Here are results - I have explained the reason for the javac warnings and release audit warnings in my previous comment. test-patch results ==================== .... [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 6 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] -1 javac. The applied patch generated 200 javac compiler warnings (more than the trunk's current 197 warnings). [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 release audit. The applied patch generated 298 release audit warnings (more than the trunk's current 291 warnings). [exec] core unit test results ====================== ... [junit] Running org.apache.pig.test.TestUnion [junit] Tests run: 3, Failures: 0, Errors: 0, Time elapsed: 44.03 sec test-contrib: BUILD SUCCESSFUL Patch committed to trunk
          Hide
          Pradeep Kamath added a comment -
          • New patch addresses all javadoc, unit test and findbugs issues.
          • The release audit warnings are unrelated issues relating to html files and not code related.
          • I tried supressing deprecated related javac warning in code but looks like there is an existing javac bug - so there is no way I am aware of to supress this in code and we may need to live with these warnings till we move to the new hadoop api
          Show
          Pradeep Kamath added a comment - New patch addresses all javadoc, unit test and findbugs issues. The release audit warnings are unrelated issues relating to html files and not code related. I tried supressing deprecated related javac warning in code but looks like there is an existing javac bug - so there is no way I am aware of to supress this in code and we may need to live with these warnings till we move to the new hadoop api
          Hide
          Olga Natkovich added a comment -

          +1 on the patch assuming that the all the failures and warning from test-patch are addressed.

          Show
          Olga Natkovich added a comment - +1 on the patch assuming that the all the failures and warning from test-patch are addressed.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12423385/PIG-953-8.patch
          against trunk revision 830664.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 6 new or modified tests.

          -1 javadoc. The javadoc tool appears to have generated 1 warning messages.

          -1 javac. The applied patch generated 202 javac compiler warnings (more than the trunk's current 197 warnings).

          -1 findbugs. The patch appears to introduce 5 new Findbugs warnings.

          -1 release audit. The applied patch generated 320 release audit warnings (more than the trunk's current 313 warnings).

          -1 core tests. The patch failed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/testReport/
          Release audit warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12423385/PIG-953-8.patch against trunk revision 830664. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 6 new or modified tests. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. -1 javac. The applied patch generated 202 javac compiler warnings (more than the trunk's current 197 warnings). -1 findbugs. The patch appears to introduce 5 new Findbugs warnings. -1 release audit. The applied patch generated 320 release audit warnings (more than the trunk's current 313 warnings). -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/testReport/ Release audit warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/123/console This message is automatically generated.
          Hide
          Pradeep Kamath added a comment -

          Fixed commit() code in PigOutputCommitter in multi store case to correctly set up the StoreConfig and StoreFunc in the Conf before calling commit() on the storefunc.

          Show
          Pradeep Kamath added a comment - Fixed commit() code in PigOutputCommitter in multi store case to correctly set up the StoreConfig and StoreFunc in the Conf before calling commit() on the storefunc.
          Hide
          Pradeep Kamath added a comment -

          I missed allowing an IOException to be thrown in commit() in CommittableStoreFunc and initialize() in IndexableLoadFunc in my previous patch - attaching new version with just that change.

          Show
          Pradeep Kamath added a comment - I missed allowing an IOException to be thrown in commit() in CommittableStoreFunc and initialize() in IndexableLoadFunc in my previous patch - attaching new version with just that change.
          Hide
          Pradeep Kamath added a comment -

          Dmitriy - by default when the application does not set an OutputCommitter, hadoop uses FileOutputCommitter. So currently (in trunk code) since pig does not set an OuptuCommitter, hadoop would be using FileOutputCommitter. Hence I derived from FileOutputCommitter so that the current cleanup continues to happen and we do the extra commit needed by Zebra.

          The new load-store redesign already has an allFinished() method in storeFunc which is the same as this commit except it does not have the Configuration - I have modified it to have the Configuration parameter.

          It turns out zebra needs the job configuration in order to open the right side file during merge join. Hence I am introducing an initialize(Configuration conf) method into the IndexableLoadFunc interface in the attached patch so that the pig runtime can call it allowing zebra to store this configuration for use in opening the right side file later.

          Show
          Pradeep Kamath added a comment - Dmitriy - by default when the application does not set an OutputCommitter, hadoop uses FileOutputCommitter. So currently (in trunk code) since pig does not set an OuptuCommitter, hadoop would be using FileOutputCommitter. Hence I derived from FileOutputCommitter so that the current cleanup continues to happen and we do the extra commit needed by Zebra. The new load-store redesign already has an allFinished() method in storeFunc which is the same as this commit except it does not have the Configuration - I have modified it to have the Configuration parameter. It turns out zebra needs the job configuration in order to open the right side file during merge join. Hence I am introducing an initialize(Configuration conf) method into the IndexableLoadFunc interface in the attached patch so that the pig runtime can call it allowing zebra to store this configuration for use in opening the right side file later.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Pradeep, it seems like PigOutputCommiter should extend OutputCommitter rather than FileOutputCommitter.
          Also – add this requirement to the StoreFunc redesign proposal?

          Show
          Dmitriy V. Ryaboy added a comment - Pradeep, it seems like PigOutputCommiter should extend OutputCommitter rather than FileOutputCommitter. Also – add this requirement to the StoreFunc redesign proposal?
          Hide
          Pradeep Kamath added a comment -

          Zebra needs a global commit method to be able to build an index on the sorted zebra file. Attaching a new patch which introduces a CommittableStoreFunc interfce with a commit() method which extends StoreFunc. Zebra store function will extend this interface and pig will call the commit() method on the CommittableStoreFunc at the completion of the job. While this is not ideal and we could add commit() into StoreFunc itself, it would break existing store functions. Also very soon, if changes in http://wiki.apache.org/pig/LoadStoreRedesignProposal are implemented, this would change anyway - so this new interface is being introduced so that till we move to the new interface changes recommended in the wiki we don't break existing store functions.

          Show
          Pradeep Kamath added a comment - Zebra needs a global commit method to be able to build an index on the sorted zebra file. Attaching a new patch which introduces a CommittableStoreFunc interfce with a commit() method which extends StoreFunc. Zebra store function will extend this interface and pig will call the commit() method on the CommittableStoreFunc at the completion of the job. While this is not ideal and we could add commit() into StoreFunc itself, it would break existing store functions. Also very soon, if changes in http://wiki.apache.org/pig/LoadStoreRedesignProposal are implemented, this would change anyway - so this new interface is being introduced so that till we move to the new interface changes recommended in the wiki we don't break existing store functions.
          Hide
          Pradeep Kamath added a comment -

          Thanks for the review Ashutosh - updated patch which addresses the concerns.

          Show
          Pradeep Kamath added a comment - Thanks for the review Ashutosh - updated patch which addresses the concerns.
          Hide
          Ashutosh Chauhan added a comment -

          ..aah.. I should have had dug more in jdk sources. AbstractList , which ArrayList extends does override equals and provides correct behavior. So, my comment is a non-issue. With nits taken care of +1 for the patch.

          Show
          Ashutosh Chauhan added a comment - ..aah.. I should have had dug more in jdk sources. AbstractList , which ArrayList extends does override equals and provides correct behavior. So, my comment is a non-issue. With nits taken care of +1 for the patch.
          Hide
          Ashutosh Chauhan added a comment -

          Changes look good. One comment I have:

          1) In SortInfo.java#equals
          We have two lists and we want to check for their equality. I quickly looked up jdk sources and it seems that ArrayList doesn't override equals, so doing equals check on lists would result in reference equality test which would be incorrect. Correct way to do this would be to first check the sizes of two lists, if they are equal iterate through both lists and check equality of items at the same index in two list.

          Few nits:
          1) TestMergeJoin contains a System.err.println which we can get rid of.
          2) There are few unused imports in patch.
          3) SortInfo.java#getSortColInfoList may result in Findbugs warning because of similar reason we discussed earlier in this jira.

          Show
          Ashutosh Chauhan added a comment - Changes look good. One comment I have: 1) In SortInfo.java#equals We have two lists and we want to check for their equality. I quickly looked up jdk sources and it seems that ArrayList doesn't override equals, so doing equals check on lists would result in reference equality test which would be incorrect. Correct way to do this would be to first check the sizes of two lists, if they are equal iterate through both lists and check equality of items at the same index in two list. Few nits: 1) TestMergeJoin contains a System.err.println which we can get rid of. 2) There are few unused imports in patch. 3) SortInfo.java#getSortColInfoList may result in Findbugs warning because of similar reason we discussed earlier in this jira.
          Hide
          Pradeep Kamath added a comment -

          Attached patch which has the SortColInfo implementation to convey sort column information in SortInfo. This patch also address PIG-981.

          Show
          Pradeep Kamath added a comment - Attached patch which has the SortColInfo implementation to convey sort column information in SortInfo. This patch also address PIG-981 .
          Hide
          Ashutosh Chauhan added a comment -

          If we are going to re-write that part later then I guess it may not be worth spending more time perfecting the design here (using SortColInfo instead of parallel arraylists)... patch as it is can be committed because we are going to update that part of the code in any case ..

          Show
          Ashutosh Chauhan added a comment - If we are going to re-write that part later then I guess it may not be worth spending more time perfecting the design here (using SortColInfo instead of parallel arraylists)... patch as it is can be committed because we are going to update that part of the code in any case ..
          Hide
          Dmitriy V. Ryaboy added a comment -

          Pradeep,

          I think that the current PigSchema can extend or contain a ResourceSchema (probably the latter as Alan indicated that PigSchema does too many other things to be considered equivalent to a ResourceSchema).

          Agreed that this rework is not required to get this patch in; I'm ok with the patch as it stands as long as we remember to go back and fix the duplicated functionality later when the Load/Store redesign is implemented.

          Show
          Dmitriy V. Ryaboy added a comment - Pradeep, I think that the current PigSchema can extend or contain a ResourceSchema (probably the latter as Alan indicated that PigSchema does too many other things to be considered equivalent to a ResourceSchema). Agreed that this rework is not required to get this patch in; I'm ok with the patch as it stands as long as we remember to go back and fix the duplicated functionality later when the Load/Store redesign is implemented.
          Hide
          Pradeep Kamath added a comment -

          Dmitriy,
          I looked at the ResourceSchema proposed in http://wiki.apache.org/pig/LoadStoreRedesignProposal and also spoke with Alan to understand the intent more. The eventual goal is for the setSchema() call in StoreFunc to give the ResourceSchema to the store implementation. The ResourceSchema will contain both pig schema information and sort column information. So Zebra or any other storage function which needs to know about sort columns will get the information from the ResourceSchema passed in setSchema().

          However, today there is a way pig runtime conveys the pig schema to store functions (through StoreConfig). We need a separate way to give sort information since pig schema does not have the ability to give it. Since after the rewrite of load/store interfaces this problem will be solved through setSchema(), the solution which we will come up with now in this jira will anyway need to be re-written. So it is cleaner to only keep sort column information in SortColInfo and have an array of SortColInfo in SortInfo. If instead we use ResourceSchema then StoreConfig will have a pig Schema and a Resource Schema which would also be confusing to callers.

          In short, since this piece code of code will need a re-write later, it is better not to make it generic now and just address immediate needs and the re-write should remove multiple representations of schema/sort information.

          Show
          Pradeep Kamath added a comment - Dmitriy, I looked at the ResourceSchema proposed in http://wiki.apache.org/pig/LoadStoreRedesignProposal and also spoke with Alan to understand the intent more. The eventual goal is for the setSchema() call in StoreFunc to give the ResourceSchema to the store implementation. The ResourceSchema will contain both pig schema information and sort column information. So Zebra or any other storage function which needs to know about sort columns will get the information from the ResourceSchema passed in setSchema(). However, today there is a way pig runtime conveys the pig schema to store functions (through StoreConfig). We need a separate way to give sort information since pig schema does not have the ability to give it. Since after the rewrite of load/store interfaces this problem will be solved through setSchema(), the solution which we will come up with now in this jira will anyway need to be re-written. So it is cleaner to only keep sort column information in SortColInfo and have an array of SortColInfo in SortInfo. If instead we use ResourceSchema then StoreConfig will have a pig Schema and a Resource Schema which would also be confusing to callers. In short, since this piece code of code will need a re-write later, it is better not to make it generic now and just address immediate needs and the re-write should remove multiple representations of schema/sort information.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Pradeep,
          Have you looked at Alan's Load/Store redesign proposal? It has a structure very similar to the one you are describing, used to describe the schema of the loaded resource.
          I think it makes sense to use that structure throughout, not just at Load time; it could effectively replace SortColumn/SortInfo.

          Show
          Dmitriy V. Ryaboy added a comment - Pradeep, Have you looked at Alan's Load/Store redesign proposal? It has a structure very similar to the one you are describing, used to describe the schema of the loaded resource. I think it makes sense to use that structure throughout, not just at Load time; it could effectively replace SortColumn/SortInfo.
          Hide
          Pradeep Kamath added a comment -

          Here is a proposal for dealing with Sort Column information in SortInfo. Rather than giving Arraylist of column names and separate array list of asc/desc flags, it would be good to have a unified structure containing both pieces of information per sort column. Also there are use cases for providing column names (zebra) and for them being optional and providing column positions instead which some other loader /optimizer might find useful. The type of the column might also be useful if available. Hence, the proposal is to have a SortColumn class with the following attributes : column name, column position (zero based index), column type, asc/desc flag. Then in SortInfo there would be a List<SortColumn> which would be available through a getter. This should address both the concerns above. Callers will need to explicity check for null column names and UNKNOWN column type since these two scenarios may occur if schema is not available for pig runtime to provide the information.

          Thoughts?

          Show
          Pradeep Kamath added a comment - Here is a proposal for dealing with Sort Column information in SortInfo. Rather than giving Arraylist of column names and separate array list of asc/desc flags, it would be good to have a unified structure containing both pieces of information per sort column. Also there are use cases for providing column names (zebra) and for them being optional and providing column positions instead which some other loader /optimizer might find useful. The type of the column might also be useful if available. Hence, the proposal is to have a SortColumn class with the following attributes : column name, column position (zero based index), column type, asc/desc flag. Then in SortInfo there would be a List<SortColumn> which would be available through a getter. This should address both the concerns above. Callers will need to explicity check for null column names and UNKNOWN column type since these two scenarios may occur if schema is not available for pig runtime to provide the information. Thoughts?
          Hide
          Ashutosh Chauhan added a comment -

          Changes look good. Couple of points:

          I think this internal structure at this point does not need to be optimized for lookup

          Well, its less about optimization and more about maintainability. First the relationship between two parallel arrays is implicit. So, if someone is reading that code he needs to "understand" that relationship of his own. If there is only one structure relationship would be explicit. Second, there is quite a bit of code around it, which IMO will be simplified if a single data structure is instead used. That said, either approach works just as fine so I will leave it upto you.

          Zebra needs column names and cannot work with positions

          That is then the limitation of Zebra which it should overcome someone point in time. There might be a good reason for it, but I fail to see what extra information names of column provides where type and position of columns should be sufficient. This also implies an additional requirement on user. If data is stored using ZebraStorage and if later is loaded back, then user has to provide the same names for columns that he gave while storing it. No such constraint exists for any other load-store like PigStorage.

          Show
          Ashutosh Chauhan added a comment - Changes look good. Couple of points: I think this internal structure at this point does not need to be optimized for lookup Well, its less about optimization and more about maintainability. First the relationship between two parallel arrays is implicit. So, if someone is reading that code he needs to "understand" that relationship of his own. If there is only one structure relationship would be explicit. Second, there is quite a bit of code around it, which IMO will be simplified if a single data structure is instead used. That said, either approach works just as fine so I will leave it upto you. Zebra needs column names and cannot work with positions That is then the limitation of Zebra which it should overcome someone point in time. There might be a good reason for it, but I fail to see what extra information names of column provides where type and position of columns should be sufficient. This also implies an additional requirement on user. If data is stored using ZebraStorage and if later is loaded back, then user has to provide the same names for columns that he gave while storing it. No such constraint exists for any other load-store like PigStorage.
          Hide
          Pradeep Kamath added a comment -

          Attached new patch against latest trunk:

          Addressing the previous two comments:

          [Ashutosh] What about LinkedHashMap? It provides all the properties we are seeking here, one data structure, O(1) lookup and guaranteed iteration order.

          LinkedHashMap is also a good choice - I think this internal structure at this point does not need to
          be optimized for lookup - hence leaving it as is

          will result in NPE when both obj1 and obj2 are null.

          Fixed the NPE in Utils.checkNullAndClass - good catch!

          A minor detail: Suppose obj1 is declared of type ArrayList<Integer> and obj2 is declared of type ArrayList<String>, obj1.getClass() == obj2.getClass() will return true thanks to type erasure by java compiler at compile time. Not sure if thats OK or not for the check here.

          You are right - however there is no way to work around type erasure - this is the best we can do.

          Does zebra requires columns to be named? If it doesn't then SortInfo could be changed in such a way that it can provide column position instead of names to loader, if columns arent named.

          Zebra needs column names and cannot work with positions

          Isnt this blocked on https://issues.apache.org/jira/browse/PIG-930

          bz2 handling needs to be fixed but this code will be needed when it is fixed. This does not make things any
          worse since bz2 is currently already broken

          because if status is Error, execution should be stopped and exception should be thrown as early as possible instead of continue doing work which will be wasted. If status is Null NPE will occur while doing join.

          Fixed to throw exception

          there is no need of passing rightinputfilename from MRCompiler to POMergeJoin

          We do need these calls to tell Zebra the filename - you are right that pig's DefaultIndexableLoader
          doesn't need these - but the code has to work with Zebra also.

          Also, seekNear() doesn't sound right. How about seekToClosest() ?

          I know seekNear() is vague and intentionally so - the hope is that users will read the javadoc comments
          to know how to implement it - seekToClosest would be equally vague in my opinion

          I think introducing order preserving flag on logical operator is a good idea.

          I think the order preserving flag idea should be addressed in a different jira as it
          is orthogonal to this jira

          Instead if we use following, we will achieve the same thing and then neither findbugs will complain, nor their is need for our own copy method.

          Fixed - removed Utils.getCopy

          In POMergeJoin.java

          // we should never get here!
                  return new Result(POStatus.STATUS_ERR, null);
          
          could be changed to
          
          // we should never get here!
                  throw new ExecException(errMsg,2176);
          

          because if we ever get there, it will result in NPE later on otherwise.

          The method has to return a Result. I think this will just be passed down the pipeline as an error
          and should not result in an NPE going by the code in getNext():

          Result rightInp = getNextRightInp();
          if(rightInp.returnStatus != POStatus.STATUS_OK){
                              prevRightInp = null;
                              return rightInp;
                          }
          

          Per a previous review comment, also changed Utils.checkNullEquals() to the following:

              public static boolean checkNullEquals(Object obj1, Object obj2, boolean checkEquality) {                                                                                                                                               
                  if(obj1 == null || obj2 == null) {                                                                                                                                                                                                 
                      return obj1 == obj2;                                                                                                                                                                                                           
                  }                                                                                                                                                                                                                                  
                  if(checkEquality) {                                                                                                                                                                                                                
                      if(!obj1.equals(obj2)) {                                                                                                                                                                                                       
                          return false;                                                                                                                                                                                                              
                      }                                                                                                                                                                                                                              
                  }                                                                                                                                                                                                                                  
                  return true;                                                                                                                                                                                                                       
              }    
          
          Show
          Pradeep Kamath added a comment - Attached new patch against latest trunk: Addressing the previous two comments: [Ashutosh] What about LinkedHashMap? It provides all the properties we are seeking here, one data structure, O(1) lookup and guaranteed iteration order. LinkedHashMap is also a good choice - I think this internal structure at this point does not need to be optimized for lookup - hence leaving it as is will result in NPE when both obj1 and obj2 are null. Fixed the NPE in Utils.checkNullAndClass - good catch! A minor detail: Suppose obj1 is declared of type ArrayList<Integer> and obj2 is declared of type ArrayList<String>, obj1.getClass() == obj2.getClass() will return true thanks to type erasure by java compiler at compile time. Not sure if thats OK or not for the check here. You are right - however there is no way to work around type erasure - this is the best we can do. Does zebra requires columns to be named? If it doesn't then SortInfo could be changed in such a way that it can provide column position instead of names to loader, if columns arent named. Zebra needs column names and cannot work with positions Isnt this blocked on https://issues.apache.org/jira/browse/PIG-930 bz2 handling needs to be fixed but this code will be needed when it is fixed. This does not make things any worse since bz2 is currently already broken because if status is Error, execution should be stopped and exception should be thrown as early as possible instead of continue doing work which will be wasted. If status is Null NPE will occur while doing join. Fixed to throw exception there is no need of passing rightinputfilename from MRCompiler to POMergeJoin We do need these calls to tell Zebra the filename - you are right that pig's DefaultIndexableLoader doesn't need these - but the code has to work with Zebra also. Also, seekNear() doesn't sound right. How about seekToClosest() ? I know seekNear() is vague and intentionally so - the hope is that users will read the javadoc comments to know how to implement it - seekToClosest would be equally vague in my opinion I think introducing order preserving flag on logical operator is a good idea. I think the order preserving flag idea should be addressed in a different jira as it is orthogonal to this jira Instead if we use following, we will achieve the same thing and then neither findbugs will complain, nor their is need for our own copy method. Fixed - removed Utils.getCopy In POMergeJoin.java // we should never get here! return new Result(POStatus.STATUS_ERR, null ); could be changed to // we should never get here! throw new ExecException(errMsg,2176); because if we ever get there, it will result in NPE later on otherwise. The method has to return a Result. I think this will just be passed down the pipeline as an error and should not result in an NPE going by the code in getNext(): Result rightInp = getNextRightInp(); if (rightInp.returnStatus != POStatus.STATUS_OK){ prevRightInp = null ; return rightInp; } Per a previous review comment, also changed Utils.checkNullEquals() to the following: public static boolean checkNullEquals( Object obj1, Object obj2, boolean checkEquality) { if (obj1 == null || obj2 == null ) { return obj1 == obj2; } if (checkEquality) { if (!obj1.equals(obj2)) { return false ; } } return true ; }
          Hide
          Ashutosh Chauhan added a comment -

          And couple more:
          8.

          Findbugs complains about passing internal members as is in getters since the caller can then modifiy these internal members - hence the copy.

              public List<Boolean> getAscColumns() {
                  return Utils.getCopy(ascColumns);
              }
          

          Instead if we use following, we will achieve the same thing and then neither findbugs will complain, nor their is need for our own copy method.

              public List<Boolean> getAscColumns() {
                  return new ArrayList<Boolean>(ascColumns);
              }
          

          9. In POMergeJoin.java

                  // we should never get here!
                  return new Result(POStatus.STATUS_ERR, null);
          

          could be changed to

                  // we should never get here!
                  throw new ExecException(errMsg,2176);
          

          because if we ever get there, it will result in NPE later on otherwise.

          Show
          Ashutosh Chauhan added a comment - And couple more: 8. Findbugs complains about passing internal members as is in getters since the caller can then modifiy these internal members - hence the copy. public List< Boolean > getAscColumns() { return Utils.getCopy(ascColumns); } Instead if we use following, we will achieve the same thing and then neither findbugs will complain, nor their is need for our own copy method. public List< Boolean > getAscColumns() { return new ArrayList< Boolean >(ascColumns); } 9. In POMergeJoin.java // we should never get here! return new Result(POStatus.STATUS_ERR, null ); could be changed to // we should never get here! throw new ExecException(errMsg,2176); because if we ever get there, it will result in NPE later on otherwise.
          Hide
          Ashutosh Chauhan added a comment -

          1. [Pradeep] zebra store function would basically needs to know the sort keys in order and which of them are asc/dsc. For this they would iterate over our data structure and require that the ordering of the keys match the primary/secondary order of the sort keys

          [Ashutosh] What about LinkedHashMap? It provides all the properties we are seeking here, one data structure, O(1) lookup and guaranteed iteration order.

          2. In Utils.java

          public static boolean checkNullAndClass(Object obj1, Object obj2) {
                  return checkNullEquals(obj1, obj2, false) && obj1.getClass() == obj2.getClass();
          }
          

          will result in NPE when both obj1 and obj2 are null.

          A minor detail: Suppose obj1 is declared of type ArrayList<Integer> and obj2 is declared of type ArrayList<String>, obj1.getClass() == obj2.getClass() will return true thanks to type erasure by java compiler at compile time. Not sure if thats OK or not for the check here.

          3. In StoreConfig.java One of the scenarios in which SortInfo is returned as null is

          * 3) the store follows an "order by" but the schema
          * of "order by" does not have column name(s) for the sort
          * column(s)
          

          I understand that reason for this additional constraint is because SortInfo maintains list of column names. But even if schema contains only type information and not the column names, that still is a sufficient information to build indexes. Information about on which column data is sorted on can be recorded using column positions isn't it? Does zebra requires columns to be named? If it doesn't then SortInfo could be changed in such a way that it can provide column position instead of names to loader, if columns arent named.

          In POMergeJoin.java
          4.

          +        currentFileName = lFile.getFileName();
          +        loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
          +        is = FileLocalizer.open(currentFileName, offset, pc);
          +        if (currentFileName.endsWith(".bz") || currentFileName.endsWith(".bz2")) {
          +            is = new CBZip2InputStream((SeekableInputStream)is, 9);
          +        } else if (currentFileName.endsWith(".gz")) {
          +            is = new GZIPInputStream(is);
          +        }
          +
          

          Isnt this blocked on https://issues.apache.org/jira/browse/PIG-930 ?

          5.

          default: // We don't deal with ERR/NULL. just pass them down
              return res;
          

          should be changed to

          default: 
              throwProcessingException(false,null);
          

          because if status is Error, execution should be stopped and exception should be thrown as early as possible instead of continue doing work which will be wasted. If status is Null NPE will occur while doing join.

          6.

          InputStream is = FileLocalizer.open(rightInputFileName, pc);
          rightLoader.bindTo(rightInputFileName, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
          

          I dont see any use of this code. I think its not required and can be removed.

          Infact, there is no need of following function too:

              /**
               * @param rightInputFileName the rightInputFileName to set
               */
              public void setRightInputFileName(String rightInputFileName) {
                  this.rightInputFileName = rightInputFileName;
              }
          

          file name of right side is obtained from index which is contained in index file. Index file is directly passed as a constructor argument of indexableLoadFunc, so there is no need of passing rightinputfilename from MRCompiler to POMergeJoin.
          And if this reasoning is correct then DefaultIndexableLoader.bindTo() should throw an IOException, because contract on DefaultIndexableLoader is that it is initialized with all the info it needs in constructor and then seekNear is called on it to seek to correct location. bindTo() shouldn't be used for this loader.
          Also, seekNear() doesn't sound right. How about seekToClosest() ?

          7. I think introducing order preserving flag on logical operator is a good idea.
          First its self documenting as the information is contained within operator and not checked by doing instanceof else where in code.
          Second its a useful information which if present can help make optimizer smart decisions. As an example, optimizer can rewrite a symmetric hash join to merge-sort join if all the logical operators in query DAG from join inputs to the root has these flags set to true. Without this flag, doing such optimizations will be hard.

          Show
          Ashutosh Chauhan added a comment - 1. [Pradeep] zebra store function would basically needs to know the sort keys in order and which of them are asc/dsc. For this they would iterate over our data structure and require that the ordering of the keys match the primary/secondary order of the sort keys [Ashutosh] What about LinkedHashMap? It provides all the properties we are seeking here, one data structure, O(1) lookup and guaranteed iteration order. 2. In Utils.java public static boolean checkNullAndClass( Object obj1, Object obj2) { return checkNullEquals(obj1, obj2, false ) && obj1.getClass() == obj2.getClass(); } will result in NPE when both obj1 and obj2 are null. A minor detail: Suppose obj1 is declared of type ArrayList<Integer> and obj2 is declared of type ArrayList<String>, obj1.getClass() == obj2.getClass() will return true thanks to type erasure by java compiler at compile time. Not sure if thats OK or not for the check here. 3. In StoreConfig.java One of the scenarios in which SortInfo is returned as null is * 3) the store follows an "order by" but the schema * of "order by" does not have column name(s) for the sort * column(s) I understand that reason for this additional constraint is because SortInfo maintains list of column names. But even if schema contains only type information and not the column names, that still is a sufficient information to build indexes. Information about on which column data is sorted on can be recorded using column positions isn't it? Does zebra requires columns to be named? If it doesn't then SortInfo could be changed in such a way that it can provide column position instead of names to loader, if columns arent named. In POMergeJoin.java 4. + currentFileName = lFile.getFileName(); + loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()); + is = FileLocalizer.open(currentFileName, offset, pc); + if (currentFileName.endsWith( ".bz" ) || currentFileName.endsWith( ".bz2" )) { + is = new CBZip2InputStream((SeekableInputStream)is, 9); + } else if (currentFileName.endsWith( ".gz" )) { + is = new GZIPInputStream(is); + } + Isnt this blocked on https://issues.apache.org/jira/browse/PIG-930 ? 5. default : // We don't deal with ERR/NULL. just pass them down return res; should be changed to default : throwProcessingException( false , null ); because if status is Error, execution should be stopped and exception should be thrown as early as possible instead of continue doing work which will be wasted. If status is Null NPE will occur while doing join. 6. InputStream is = FileLocalizer.open(rightInputFileName, pc); rightLoader.bindTo(rightInputFileName, new BufferedPositionedInputStream(is), 0, Long .MAX_VALUE); I dont see any use of this code. I think its not required and can be removed. Infact, there is no need of following function too: /** * @param rightInputFileName the rightInputFileName to set */ public void setRightInputFileName( String rightInputFileName) { this .rightInputFileName = rightInputFileName; } file name of right side is obtained from index which is contained in index file. Index file is directly passed as a constructor argument of indexableLoadFunc, so there is no need of passing rightinputfilename from MRCompiler to POMergeJoin. And if this reasoning is correct then DefaultIndexableLoader.bindTo() should throw an IOException, because contract on DefaultIndexableLoader is that it is initialized with all the info it needs in constructor and then seekNear is called on it to seek to correct location. bindTo() shouldn't be used for this loader. Also, seekNear() doesn't sound right. How about seekToClosest() ? 7. I think introducing order preserving flag on logical operator is a good idea. First its self documenting as the information is contained within operator and not checked by doing instanceof else where in code. Second its a useful information which if present can help make optimizer smart decisions. As an example, optimizer can rewrite a symmetric hash join to merge-sort join if all the logical operators in query DAG from join inputs to the root has these flags set to true. Without this flag, doing such optimizations will be hard.
          Hide
          Olga Natkovich added a comment -

          +1 on what Alan said

          Show
          Olga Natkovich added a comment - +1 on what Alan said
          Hide
          Alan Gates added a comment -

          -1 to adding an orderPreserving flag on operators. We have no intention of ever promising that any relational operator beyond Order and Limit preserve order. The fact that some happen to now (like filter) is a side effect of the current implementation, not a feature. If we add a flag, it becomes a feature that we will be expected to maintain.

          Show
          Alan Gates added a comment - -1 to adding an orderPreserving flag on operators. We have no intention of ever promising that any relational operator beyond Order and Limit preserve order. The fact that some happen to now (like filter) is a side effect of the current implementation, not a feature. If we add a flag, it becomes a feature that we will be expected to maintain.
          Hide
          Dmitriy V. Ryaboy added a comment -

          I got my trues and falses reversed on the NPE thing. You are right, the function works as intended.
          I still think it's too verbose, but agree that it's a style issue – I guess if the commiters like it, it's fine

          Show
          Dmitriy V. Ryaboy added a comment - I got my trues and falses reversed on the NPE thing. You are right, the function works as intended. I still think it's too verbose, but agree that it's a style issue – I guess if the commiters like it, it's fine
          Hide
          Dmitriy V. Ryaboy added a comment -

          Pradeep: Pig only guarantees order with limit following order - for any other relational operator following order there are no guarantees. Today it is true that filter or a column pruning foreach would also preserve order but this can change if needed in the future. There explicit code to ensure order-limit combination works by preserving order - there is no such explicit check for other operators (keeping it open for change in the future)

          That actually tells me that an orderPreserving property on a LogicalOperator is a really good idea.
          That way we can set it to true on all operators that are at the moment order-preserving (limit, filter, column-prining foreach), and not commit to forever maintaining that contract. If filter starts changing order, the patch will simply have to include a change to set orderPreserving to false in POFilter, and everything will work automagically.

          Show
          Dmitriy V. Ryaboy added a comment - Pradeep: Pig only guarantees order with limit following order - for any other relational operator following order there are no guarantees. Today it is true that filter or a column pruning foreach would also preserve order but this can change if needed in the future. There explicit code to ensure order-limit combination works by preserving order - there is no such explicit check for other operators (keeping it open for change in the future) That actually tells me that an orderPreserving property on a LogicalOperator is a really good idea. That way we can set it to true on all operators that are at the moment order-preserving (limit, filter, column-prining foreach), and not commit to forever maintaining that contract. If filter starts changing order, the patch will simply have to include a change to set orderPreserving to false in POFilter, and everything will work automagically.
          Hide
          Pradeep Kamath added a comment -

          I see the first part as a coding style preference - I have done both styles in code myself - don't think it is a major issue with readability with current implementation

          Can you explain the NPE? If either object is null, the code would return with false unless both are null. If checkEquality is false, the caller should know that only null equality has been checked thus far and if true was returned then the two objects are null and hence equal. My main intent was to have this helper function be used from other Class's equals() implementation so that this mundane check for null need not be repeated in every equals implementation. Maybe I am not understanding your use case better - an example might help.

          Show
          Pradeep Kamath added a comment - I see the first part as a coding style preference - I have done both styles in code myself - don't think it is a major issue with readability with current implementation Can you explain the NPE? If either object is null, the code would return with false unless both are null. If checkEquality is false, the caller should know that only null equality has been checked thus far and if true was returned then the two objects are null and hence equal. My main intent was to have this helper function be used from other Class's equals() implementation so that this mundane check for null need not be repeated in every equals implementation. Maybe I am not understanding your use case better - an example might help.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Pradeep – I would argue that my rewrite is not more terse, but less verbose
          I just don't see how

          if (boolean) {
            return true;
          } else {
            return false;
          }
          

          is ever better or more readable than

          return boolean;
          

          As far as oddness, I would argue that since the person who wrote the function introduced a possible NPE a few lines lower, someone else can hardly be expected to use it properly . See my comment about checkEquality being false. The function as written misses a corner case.

          Show
          Dmitriy V. Ryaboy added a comment - Pradeep – I would argue that my rewrite is not more terse, but less verbose I just don't see how if ( boolean ) { return true ; } else { return false ; } is ever better or more readable than return boolean ; As far as oddness, I would argue that since the person who wrote the function introduced a possible NPE a few lines lower, someone else can hardly be expected to use it properly . See my comment about checkEquality being false. The function as written misses a corner case.
          Hide
          Pradeep Kamath added a comment -

          Response to previous comment:

          Dmitriy: seekNear seems ambiguous, as "near" is a generic concept that does not necessarily imply "before or to, but not after" - which is what this method is required to do. How about "seekBefore()"?
          Pradeep: I had initially thought of naming this method seekBefore(). However for the case where the key we are using to seek not being present in the right input, the loader can either position at the biggest value before the key OR the smallest value after the key (zebra loader for example can only do the latter). The name "seekBefore" suggests that implementations should always seek before the key in question - hence I chose seekNear.

          Dmitriy: it looks like the large diff blocks in MergeSort et al are mostly moves of code blocks, not significant code changes, correct?
          Pradeep: I am assuming you meant POMergeJoin - yes - its mostly code move to DefaultIndexableLoader

          Dmitriy: Why does getAscColumns and getSortColumns make a copy of the list? Seems like we can save some memory and cpu here.
          Pradeep: Findbugs complains about passing internal members as is in getters since the caller can then modifiy these internal members - hence the copy. This should not be a performance/memory issue since these copies are 1) on small structures 2) in front end at compile time and not at runtime.

          Dmitriry: For that matter, why not use a map of (String)colName-> (Boolean)ascending instead of 2 lists? One structure, plus O(1) lookup.
          Pradeep: This suggestion is reasonable - I picked the current implementation since its inline with how these things are represented today in LOSort. Unless the user of SortInfo does lookup using sort column names, we won't get O(1). I designed this keeping zebra's use case (which is the only use case at this point) and the zebra store function would basically needs to know the sort keys in order and which of them are asc/dsc. For this they would iterate over our datastructure and require that the ordering of the keys match the primary/secondary order of the sort keys - hence a list lends itself better for that. I debated using a List<Pair<String,Boolean>> but thought I can avoid Pair since its a pig implementation class (if tomorrow zebra wants to expose the same SortInfo to its users).

          Dmitriy: Not sure about the use of super() in the constructor of a class that doesn't extend anything but Object. Is there some magic that requires it?
          Pradeep: This was unintended - all thanks to eclipse - will remove it a next iteration of this patch

          Dmitriy: In Log2PhysTranslator, why hardcode the Limit operator? There are other operators that don't change sort order, such as filter. Perhaps add a method to Logical Operators that indicates if they alter sort order of their inputs?
          Pradeep: Pig only guarantees order with limit following order - for any other relational operator following order there are no guarantees. Today it is true that filter or a column pruning foreach would also preserve order but this can change if needed in the future. There explicit code to ensure order-limit combination works by preserving order - there is no such explicit check for other operators (keeping it open for change in the future)

          Dmitriy: Even with this rewrite, this seems like an odd function. It being as odd as it is leads to it not being used safely when you set checkEquality to false (just a few lines later)-- if obj1 is null and obj2 is not, the func returns true, you try to call a method on obj1, and get an NPE.
          Pradeep: The rewrite is more terse and is another option as against the explicit if-else I have - more a coding style issue than correctness. The idea of having this function was to serve as a helper for use in Classes which need to implement equals(). It is cumbersone that every new class's equals has to do the equivalent of what you suggest:

          Util.bothNull(obj1, obj2) || (Util.notNull(obj1, obj2) && obj1.equals(obj2));
          

          This one method can be used either to only check that both objects are not null OR to do that and additionally check equality. Not sure I understand the oddity - am I missing something?

          Dmitriy: This comment has a typo (and instead of "an"):
          Pradeep: Will fix in next iteration

          Show
          Pradeep Kamath added a comment - Response to previous comment: Dmitriy: seekNear seems ambiguous, as "near" is a generic concept that does not necessarily imply "before or to, but not after" - which is what this method is required to do. How about "seekBefore()"? Pradeep: I had initially thought of naming this method seekBefore(). However for the case where the key we are using to seek not being present in the right input, the loader can either position at the biggest value before the key OR the smallest value after the key (zebra loader for example can only do the latter). The name "seekBefore" suggests that implementations should always seek before the key in question - hence I chose seekNear. Dmitriy: it looks like the large diff blocks in MergeSort et al are mostly moves of code blocks, not significant code changes, correct? Pradeep: I am assuming you meant POMergeJoin - yes - its mostly code move to DefaultIndexableLoader Dmitriy: Why does getAscColumns and getSortColumns make a copy of the list? Seems like we can save some memory and cpu here. Pradeep: Findbugs complains about passing internal members as is in getters since the caller can then modifiy these internal members - hence the copy. This should not be a performance/memory issue since these copies are 1) on small structures 2) in front end at compile time and not at runtime. Dmitriry: For that matter, why not use a map of (String)colName-> (Boolean)ascending instead of 2 lists? One structure, plus O(1) lookup. Pradeep: This suggestion is reasonable - I picked the current implementation since its inline with how these things are represented today in LOSort. Unless the user of SortInfo does lookup using sort column names, we won't get O(1). I designed this keeping zebra's use case (which is the only use case at this point) and the zebra store function would basically needs to know the sort keys in order and which of them are asc/dsc. For this they would iterate over our datastructure and require that the ordering of the keys match the primary/secondary order of the sort keys - hence a list lends itself better for that. I debated using a List<Pair<String,Boolean>> but thought I can avoid Pair since its a pig implementation class (if tomorrow zebra wants to expose the same SortInfo to its users). Dmitriy: Not sure about the use of super() in the constructor of a class that doesn't extend anything but Object. Is there some magic that requires it? Pradeep: This was unintended - all thanks to eclipse - will remove it a next iteration of this patch Dmitriy: In Log2PhysTranslator, why hardcode the Limit operator? There are other operators that don't change sort order, such as filter. Perhaps add a method to Logical Operators that indicates if they alter sort order of their inputs? Pradeep: Pig only guarantees order with limit following order - for any other relational operator following order there are no guarantees. Today it is true that filter or a column pruning foreach would also preserve order but this can change if needed in the future. There explicit code to ensure order-limit combination works by preserving order - there is no such explicit check for other operators (keeping it open for change in the future) Dmitriy: Even with this rewrite, this seems like an odd function. It being as odd as it is leads to it not being used safely when you set checkEquality to false (just a few lines later)-- if obj1 is null and obj2 is not, the func returns true, you try to call a method on obj1, and get an NPE. Pradeep: The rewrite is more terse and is another option as against the explicit if-else I have - more a coding style issue than correctness. The idea of having this function was to serve as a helper for use in Classes which need to implement equals(). It is cumbersone that every new class's equals has to do the equivalent of what you suggest: Util.bothNull(obj1, obj2) || (Util.notNull(obj1, obj2) && obj1.equals(obj2)); This one method can be used either to only check that both objects are not null OR to do that and additionally check equality. Not sure I understand the oddity - am I missing something? Dmitriy: This comment has a typo (and instead of "an"): Pradeep: Will fix in next iteration
          Hide
          Dmitriy V. Ryaboy added a comment -

          Pradeep,

          First, I think this is very important to have, not just for Merge but for other things that might benefit from knowing sort orders, as well.

          A few minor nits from a cursory glance at the code. I didn't check the actual logic very carefully yet – it looks like the large diff blocks in MergeSort et al are mostly moves of code blocks, not significant code changes, correct?

          On to the comments:

          seekNear seems ambiguous, as "near" is a generic concept that does not necessarily imply "before or to, but not after" – which is what this method is required to do. How about "seekBefore()"?

          Why does getAscColumns and getSortColumns make a copy of the list? Seems like we can save some memory and cpu here.

          For that matter, why not use a map of (String)colName-> (Boolean)ascending instead of 2 lists? One structure, plus O(1) lookup.

          Not sure about the use of super() in the constructor of a class that doesn't extend anything but Object. Is there some magic that requires it?

          In Log2PhysTranslator, why hardcode the Limit operator? There are other operators that don't change sort order, such as filter. Perhaps add a method to Logical Operators that indicates if they alter sort order of their inputs?

          in Utils

          checkNullEquals is better written as

          if (obj1 == null || obj2 == null) {
          	return obj1 == obj2;
          } else  {
          	return checkEquality ? obj1.equals(obj2) : true;
          }
          

          Even with this rewrite, this seems like an odd function. It being as odd as it is leads to it not being used safely when you set checkEquality to false (just a few lines later)-- if obj1 is null and obj2 is not, the func returns true, you try to call a method on obj1, and get an NPE.

          Probably better not to roll all this into one amorphous function and simply write

          Util.bothNull(obj1, obj2) || (Util.notNull(obj1, obj2) && obj1.equals(obj2));
          

          (the implementations of bothNull and notNull are obvious – just conjunction and disjunction of obj == null)

          In StoreConfig
          This comment has a typo (and instead of "an"):
          "* 1) the store does not follow and order by"

          Show
          Dmitriy V. Ryaboy added a comment - Pradeep, First, I think this is very important to have, not just for Merge but for other things that might benefit from knowing sort orders, as well. A few minor nits from a cursory glance at the code. I didn't check the actual logic very carefully yet – it looks like the large diff blocks in MergeSort et al are mostly moves of code blocks, not significant code changes, correct? On to the comments: seekNear seems ambiguous, as "near" is a generic concept that does not necessarily imply "before or to, but not after" – which is what this method is required to do. How about "seekBefore()"? Why does getAscColumns and getSortColumns make a copy of the list? Seems like we can save some memory and cpu here. For that matter, why not use a map of (String)colName-> (Boolean)ascending instead of 2 lists? One structure, plus O(1) lookup. Not sure about the use of super() in the constructor of a class that doesn't extend anything but Object. Is there some magic that requires it? In Log2PhysTranslator, why hardcode the Limit operator? There are other operators that don't change sort order, such as filter. Perhaps add a method to Logical Operators that indicates if they alter sort order of their inputs? in Utils checkNullEquals is better written as if (obj1 == null || obj2 == null ) { return obj1 == obj2; } else { return checkEquality ? obj1.equals(obj2) : true ; } Even with this rewrite, this seems like an odd function. It being as odd as it is leads to it not being used safely when you set checkEquality to false (just a few lines later)-- if obj1 is null and obj2 is not, the func returns true, you try to call a method on obj1, and get an NPE. Probably better not to roll all this into one amorphous function and simply write Util.bothNull(obj1, obj2) || (Util.notNull(obj1, obj2) && obj1.equals(obj2)); (the implementations of bothNull and notNull are obvious – just conjunction and disjunction of obj == null) In StoreConfig This comment has a typo (and instead of "an"): "* 1) the store does not follow and order by"
          Hide
          Pradeep Kamath added a comment -

          Attached patch has the changes in pig to support interacting with Loaders which can construct the index internally. The main change is to hide the index access behind and interface. So a new interface called IndexableLoadFunc which extends LoadFunc has been introduced with the following two methods:

          /**
               * This method is called by the pig runtime to indicate
               * to the LoadFunc to position its underlying input stream
               * near the keys supplied as the argument. Specifically:
               * 1) if the keys are present in the input stream, the loadfunc
               * implementation should position its read position to 
               * a record where the key(s) is/are the biggest key(s) less than
               * the key(s) supplied in the argument OR to the record with the
               * first occurrence of the keys(s) supplied.
               * 2) if the key(s) are absent in the input stream, the implementation
               * should position its read position to a record where the key(s)
               * is/are the biggest key(s) less than the key(s) supplied OR to the
               * first record where the key(s) is/are the smallest key(s) greater
               * than the keys(s) supplied. 
               * The description above holds for descending order data in 
               * a similar manner with "biggest" and "less than" replaced with
               * "smallest" and "greater than" and vice versa.
               *  
               * @param keys Tuple with join keys (which are a prefix of the sort
               * keys of the input data). For example if the data is sorted on
               * columns in position 2,4,5 any of the following Tuples are
               * valid as an argument value:
               * (fieldAt(2))
               * (fieldAt(2), fieldAt(4))
               * (fieldAt(2), fieldAt(4), fieldAt(5))
               * 
               * The following are some invalid cases:
               * (fieldAt(4))
               * (fieldAt(2), fieldAt(5))
               * (fieldAt(4), fieldAt(5))
               * 
               * @throws IOException When the loadFunc is unable to position
               * to the required point in its input stream
               */
              public void seekNear(Tuple keys) throws IOException;
              
              
              /**
               * A method called by the pig runtime to give an opportunity
               * for implementations to perform cleanup actions like closing
               * the underlying input stream. This is necessary since while
               * performing a join the pig run time may determine than no further
               * join is possible with remaining records and may indicate to the
               * IndexableLoader to cleanup by calling this method.
               * 
               * @throws IOException if the loadfunc is unable to perform
               * its close actions.
               */
              public void close() throws IOException;
          

          The idea is that the POMergeJoin will use seekNear to indicate to the loader to position itself to the correct point in the right input. To keep the POMergeJoin implementation simple, for the default case (where the loader (for example PigStorage) does not implement IndexableLoadFunc), a DefaultIndexableLoader which encapsulates the real loader and provides the implementation for IndexableLoadFunc's methods will be used. In this case, an index will be created as it is done currently and DefaultIndexableLoader will use that index to implement IndexableLoadFunc's methods.

          A SortInfo class containing names of sort columns and ascending/descending information is also introduce and will be available through StoreConfig. This will be useful for ZebraStore to determine whether the data it is writing out is sorted and to create an index appropriately.

          Show
          Pradeep Kamath added a comment - Attached patch has the changes in pig to support interacting with Loaders which can construct the index internally. The main change is to hide the index access behind and interface. So a new interface called IndexableLoadFunc which extends LoadFunc has been introduced with the following two methods: /** * This method is called by the pig runtime to indicate * to the LoadFunc to position its underlying input stream * near the keys supplied as the argument. Specifically: * 1) if the keys are present in the input stream, the loadfunc * implementation should position its read position to * a record where the key(s) is/are the biggest key(s) less than * the key(s) supplied in the argument OR to the record with the * first occurrence of the keys(s) supplied. * 2) if the key(s) are absent in the input stream, the implementation * should position its read position to a record where the key(s) * is/are the biggest key(s) less than the key(s) supplied OR to the * first record where the key(s) is/are the smallest key(s) greater * than the keys(s) supplied. * The description above holds for descending order data in * a similar manner with "biggest" and "less than" replaced with * "smallest" and "greater than" and vice versa. * * @param keys Tuple with join keys (which are a prefix of the sort * keys of the input data). For example if the data is sorted on * columns in position 2,4,5 any of the following Tuples are * valid as an argument value: * (fieldAt(2)) * (fieldAt(2), fieldAt(4)) * (fieldAt(2), fieldAt(4), fieldAt(5)) * * The following are some invalid cases: * (fieldAt(4)) * (fieldAt(2), fieldAt(5)) * (fieldAt(4), fieldAt(5)) * * @ throws IOException When the loadFunc is unable to position * to the required point in its input stream */ public void seekNear(Tuple keys) throws IOException; /** * A method called by the pig runtime to give an opportunity * for implementations to perform cleanup actions like closing * the underlying input stream. This is necessary since while * performing a join the pig run time may determine than no further * join is possible with remaining records and may indicate to the * IndexableLoader to cleanup by calling this method. * * @ throws IOException if the loadfunc is unable to perform * its close actions. */ public void close() throws IOException; The idea is that the POMergeJoin will use seekNear to indicate to the loader to position itself to the correct point in the right input. To keep the POMergeJoin implementation simple, for the default case (where the loader (for example PigStorage) does not implement IndexableLoadFunc), a DefaultIndexableLoader which encapsulates the real loader and provides the implementation for IndexableLoadFunc's methods will be used. In this case, an index will be created as it is done currently and DefaultIndexableLoader will use that index to implement IndexableLoadFunc's methods. A SortInfo class containing names of sort columns and ascending/descending information is also introduce and will be available through StoreConfig. This will be useful for ZebraStore to determine whether the data it is writing out is sorted and to create an index appropriately.

            People

            • Assignee:
              Pradeep Kamath
              Reporter:
              Pradeep Kamath
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development