Hive
  1. Hive
  2. HIVE-1197

create a new input format where a mapper spans a file

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.6.0
    • Component/s: Query Processor
    • Labels:
    • Hadoop Flags:
      Reviewed

      Description

      This will be needed for Sort merge joins.

      1. hive.1197.1.patch
        22 kB
        Siying Dong
      2. hive.1197.2.patch
        34 kB
        Siying Dong
      3. hive.1197.3.patch
        34 kB
        Siying Dong
      4. hive.1197.4.patch
        34 kB
        Siying Dong

        Activity

        Hide
        Zheng Shao added a comment -

        Can you explain what does "a mapper spans a file" mean?

        Show
        Zheng Shao added a comment - Can you explain what does "a mapper spans a file" mean?
        Hide
        Namit Jain added a comment -

        Currently, the split that a mapper processes is determined by a variety of parameters, including the dfs block size, min split size etc.

        It might be useful to have an option when the users wants a mapper so scan 1 file. This will be specially useful for sort-merge join.
        If the data is partitioned into various buckets, and each bucket us sorted, the sort merge join can join the different buckets together.

        For example, consider the following scenario:

        table T1: sorted and bucketed by column 'key' into 1000 buckets
        table T2: sorted and bucketed by column 'key' into 1000 buckets

        and the query:

        select * from T1 join T2 on key
        mapjoin.

        Instead of joining the table T1 with T2, the 1000 buckets can be joined with each other individually.
        Since the data is sorted on the join key, sort-merge join can be used.
        Say the buckets are named: b0001, b0002 .. b1000
        Say table T1 is the big table, and the buckets from T2 are being read as part of the mapper which is spawned to process T1,
        under the current approach, it will be very difficult to perform outer joins.

        For example, if bucket b1 for T1 contains:

        1
        2
        5
        6
        9
        16
        22
        30

        and the corresponding bucket for T2 contains:

        2
        4
        8

        If there are 2 mappers for bucket b1 for T1, processing 4 records each ((1,2,5,6) and (9.16.22.30) respectively.
        It will be very difficult to perform a outer join. The mapper will need to peek into the previous record
        and the next record respectively.

        Moreover, it will be very difficult to ensure that the result also has 1000 buckets. Another map-reduce job
        will be needed for the same.

        This can be easily solved if we are guaranteed that the whole bucket (or the file corresponding to the bucket),
        will be processed by a single mapper.

        Show
        Namit Jain added a comment - Currently, the split that a mapper processes is determined by a variety of parameters, including the dfs block size, min split size etc. It might be useful to have an option when the users wants a mapper so scan 1 file. This will be specially useful for sort-merge join. If the data is partitioned into various buckets, and each bucket us sorted, the sort merge join can join the different buckets together. For example, consider the following scenario: table T1: sorted and bucketed by column 'key' into 1000 buckets table T2: sorted and bucketed by column 'key' into 1000 buckets and the query: select * from T1 join T2 on key mapjoin. Instead of joining the table T1 with T2, the 1000 buckets can be joined with each other individually. Since the data is sorted on the join key, sort-merge join can be used. Say the buckets are named: b0001, b0002 .. b1000 Say table T1 is the big table, and the buckets from T2 are being read as part of the mapper which is spawned to process T1, under the current approach, it will be very difficult to perform outer joins. For example, if bucket b1 for T1 contains: 1 2 5 6 9 16 22 30 and the corresponding bucket for T2 contains: 2 4 8 If there are 2 mappers for bucket b1 for T1, processing 4 records each ((1,2,5,6) and (9.16.22.30) respectively. It will be very difficult to perform a outer join. The mapper will need to peek into the previous record and the next record respectively. Moreover, it will be very difficult to ensure that the result also has 1000 buckets. Another map-reduce job will be needed for the same. This can be easily solved if we are guaranteed that the whole bucket (or the file corresponding to the bucket), will be processed by a single mapper.
        Hide
        Namit Jain added a comment -

        Overall, looks good - some general comments.

        Would it be a good idea to make BucketizedHiveInputFormat extend HiveInpuFormat, and BucketizedHiveRecordReader extend HiveRecordReader ?
        You wont have to copy a lot of code, and it would be easy to maintain. For example, the check for ExecMapper in hiverecordreader and such future
        optimizations would be easier to maintain.

        Show
        Namit Jain added a comment - Overall, looks good - some general comments. Would it be a good idea to make BucketizedHiveInputFormat extend HiveInpuFormat, and BucketizedHiveRecordReader extend HiveRecordReader ? You wont have to copy a lot of code, and it would be easy to maintain. For example, the check for ExecMapper in hiverecordreader and such future optimizations would be easier to maintain.
        Hide
        He Yongqiang added a comment -

        Looks very good overall, congrats!

        just few minor comments:
        1. Can you change inputFormatClassName to use getter and setter method?
        2. some duplication code with HiveInputFormat, can we reuse them?
        3. In BucketizedHiveRecordReader's next, i think should remove the check of "curReader == null". we should throw an exception if curReader==null, which means the reader has been closed.
        4. i think we should remove line 207 in BucketizedHiveInputFormat: newjob.setInputFormat(inputFormat.getClass());
        5. In HiveRecordReader,
        5.1 progress is calculated based on (number of splits done) / (total split number), can we make it more accurate? Let's say the work is evenly divided among all splits. something like this: (number of splits done) / (total split number) + currReader.getProgess();
        5.2 getPos should return this currReader.getPos()

        Another one is do you think it is a good idea to let the BucketizedHiveInputFormat extend HiveInputFormat? That way, the code would be more clear. And we should put the RecordReader and InputSplit in the same file as BucketizedHiveInputFormat.

        Show
        He Yongqiang added a comment - Looks very good overall, congrats! just few minor comments: 1. Can you change inputFormatClassName to use getter and setter method? 2. some duplication code with HiveInputFormat, can we reuse them? 3. In BucketizedHiveRecordReader's next, i think should remove the check of "curReader == null". we should throw an exception if curReader==null, which means the reader has been closed. 4. i think we should remove line 207 in BucketizedHiveInputFormat: newjob.setInputFormat(inputFormat.getClass()); 5. In HiveRecordReader, 5.1 progress is calculated based on (number of splits done) / (total split number), can we make it more accurate? Let's say the work is evenly divided among all splits. something like this: (number of splits done) / (total split number) + currReader.getProgess(); 5.2 getPos should return this currReader.getPos() Another one is do you think it is a good idea to let the BucketizedHiveInputFormat extend HiveInputFormat? That way, the code would be more clear. And we should put the RecordReader and InputSplit in the same file as BucketizedHiveInputFormat.
        Hide
        He Yongqiang added a comment -

        Correction about 5.1, it should be ((number of splits done) + currReader.getProgess() )/ (total split number)

        Show
        He Yongqiang added a comment - Correction about 5.1, it should be ((number of splits done) + currReader.getProgess() )/ (total split number)
        Hide
        Siying Dong added a comment -

        no change from hive.1197.3.patch besides adding a new line in the end of the file.

        Show
        Siying Dong added a comment - no change from hive.1197.3.patch besides adding a new line in the end of the file.
        Hide
        Namit Jain added a comment -

        What is the reason for creating such a large table in the test ?
        Is that necessary for testing - since changing dfs.block.size is not helping ?

        Show
        Namit Jain added a comment - What is the reason for creating such a large table in the test ? Is that necessary for testing - since changing dfs.block.size is not helping ?
        Hide
        Namit Jain added a comment -

        Looks good to me - can you update the test result file - I am getting a diff.
        Other than that, I am fine and can merge, unless Yongqiang has some additional comments

        Show
        Namit Jain added a comment - Looks good to me - can you update the test result file - I am getting a diff. Other than that, I am fine and can merge, unless Yongqiang has some additional comments
        Hide
        Siying Dong added a comment -

        At least when I tried from my box, it doesn't work.

        Show
        Siying Dong added a comment - At least when I tried from my box, it doesn't work.
        Hide
        Namit Jain added a comment -

        Does the new test work for you ?

        Show
        Namit Jain added a comment - Does the new test work for you ?
        Hide
        Siying Dong added a comment -

        update test output file.

        Show
        Siying Dong added a comment - update test output file.
        Hide
        Namit Jain added a comment -

        +1

        will commit if the tests pass

        Show
        Namit Jain added a comment - +1 will commit if the tests pass
        Hide
        Namit Jain added a comment -

        Committed. Thanks Siying

        Show
        Namit Jain added a comment - Committed. Thanks Siying

          People

          • Assignee:
            Siying Dong
            Reporter:
            Namit Jain
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development