Hive
  1. Hive
  2. HIVE-439

merge small files after a map-only job

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.3.0
    • Fix Version/s: 0.4.0
    • Component/s: Query Processor
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      HIVE-439. Merge small files after a map-only job. (Namit Jain via zshao)

      Description

      There are cases when the input to a Hive job are thousands of small files. In this case, there is a mapper for each file. Most of the overhead for spawning all these mappers can be avoided if these small files are combined into fewer larger files.

      The problem can also be addressed by having a mapper span multiple blocks as in:

      https://issues.apache.org/jira/browse/HIVE-74

      Bit, it also makes sense in HIVE to merge files whenever possible.

      <property>
        <name>hive.merge.mapfiles</name>
        <value>true</value>
        <description>Merge small files at the end of the job</description>
      </property>
      
      <property>
        <name>hive.merge.size.per.task</name>
        <value>256000000</value>
        <description>Size of merged files at the end of the job</description>
      </property>
      
      1. hive.439.1.patch
        1.50 MB
        Namit Jain
      2. hive.439.2.patch
        1.45 MB
        Namit Jain
      3. hive.439.3.patch
        1.45 MB
        Namit Jain
      4. hive.439.4.patch
        1.45 MB
        Namit Jain
      5. hive.439.5.patch
        1.45 MB
        Namit Jain

        Activity

        Hide
        Namit Jain added a comment -

        Before the moveTask, another DummyTask can be created in order to merge the files is needed. Some heuristics can be present to start with:
        like average size of a file etc.

        Show
        Namit Jain added a comment - Before the moveTask, another DummyTask can be created in order to merge the files is needed. Some heuristics can be present to start with: like average size of a file etc.
        Hide
        Namit Jain added a comment -

        Instead of having a map-only job to do the concatenation, I think we should use a map-reduce job.
        This way, when sorting properties of a table are maintained in the metastore, they can be preserved.

        Show
        Namit Jain added a comment - Instead of having a map-only job to do the concatenation, I think we should use a map-reduce job. This way, when sorting properties of a table are maintained in the metastore, they can be preserved.
        Hide
        Zheng Shao added a comment -

        @hive.439.1.patch
        We don't need anything in sort key in the reduceSinkOperator. Can you remove the "rand()" for sort key?
        That will make the process faster because all rows are equal (for sorting purpose).

        Show
        Zheng Shao added a comment - @hive.439.1.patch We don't need anything in sort key in the reduceSinkOperator. Can you remove the "rand()" for sort key? That will make the process faster because all rows are equal (for sorting purpose).
        Hide
        Namit Jain added a comment -

        But, doesnt that mean that all rows will go to the same reducer - which might be bad

        Show
        Namit Jain added a comment - But, doesnt that mean that all rows will go to the same reducer - which might be bad
        Hide
        Zheng Shao added a comment -

        @hive.439.1.patch
        rand_partitionpruner2.q:
        -query: select * from tmptable x sort by x.key
        +query: select * from tmptable x sort by x.key,value
        ...
        +103 val_103 2008-04-08 12
        103 val_103 2008-04-08 11
        -103 val_103 2008-04-08 12

        I guess we have to sort by all columns to make the result deterministic.

        The same thing with input_part2.

        Also, shall we rename "Conditional Operator" to "Conditional Task" in explain plan? In Hive world, Operator usually means one operator in the map-reduce tasks.

        Show
        Zheng Shao added a comment - @hive.439.1.patch rand_partitionpruner2.q: -query: select * from tmptable x sort by x.key +query: select * from tmptable x sort by x.key,value ... +103 val_103 2008-04-08 12 103 val_103 2008-04-08 11 -103 val_103 2008-04-08 12 I guess we have to sort by all columns to make the result deterministic. The same thing with input_part2. Also, shall we rename "Conditional Operator" to "Conditional Task" in explain plan? In Hive world, Operator usually means one operator in the map-reduce tasks.
        Hide
        Namit Jain added a comment -

        incorporated comments

        Show
        Namit Jain added a comment - incorporated comments
        Hide
        Zheng Shao added a comment -

        @hive.439.2.patch

        New configuration variables should be added to conf/hive-default.xml and data/conf/hive-default.xml

        Show
        Zheng Shao added a comment - @hive.439.2.patch New configuration variables should be added to conf/hive-default.xml and data/conf/hive-default.xml
        Hide
        Namit Jain added a comment -

        added comments

        Show
        Namit Jain added a comment - added comments
        Hide
        Zheng Shao added a comment -

        @hive.439.3.patch:
        I don't understand how the ConditionalTask gets the resolver and resolverCtx at execution time.

        Are they serialized together with the ConditionalTask?
        If so, we need to mark those classes as serializable and move them to ConditionalWork, right?
        Otherwise it kind of breaks the implicit contract that everything that needs to be serialized is in the Work instead of the Task.

        Show
        Zheng Shao added a comment - @hive.439.3.patch: I don't understand how the ConditionalTask gets the resolver and resolverCtx at execution time. Are they serialized together with the ConditionalTask? If so, we need to mark those classes as serializable and move them to ConditionalWork, right? Otherwise it kind of breaks the implicit contract that everything that needs to be serialized is in the Work instead of the Task.
        Hide
        Namit Jain added a comment -

        It does not - ConditionalTask is executed on the client side i.e Hive server.
        The resolver and resolver can be accessed there - no seriaiization required -
        it is not a map-reduce task.

        It is just a placeholder to decide which task to execute. Since this decision cannot be made at
        compile time, and must be made after certain tasks are executed

        Show
        Namit Jain added a comment - It does not - ConditionalTask is executed on the client side i.e Hive server. The resolver and resolver can be accessed there - no seriaiization required - it is not a map-reduce task. It is just a placeholder to decide which task to execute. Since this decision cannot be made at compile time, and must be made after certain tasks are executed
        Hide
        Zheng Shao added a comment -

        Got it. Do we want to make the whole plan (containing all tasks) serializable? With that we will be able to compile the job once and rerun it many times in the future.

        Show
        Zheng Shao added a comment - Got it. Do we want to make the whole plan (containing all tasks) serializable? With that we will be able to compile the job once and rerun it many times in the future.
        Hide
        Namit Jain added a comment -

        I think this is outside the scope of this patch. I can open a new patch and work on it sometime later.

        Show
        Namit Jain added a comment - I think this is outside the scope of this patch. I can open a new patch and work on it sometime later.
        Hide
        Zheng Shao added a comment -

        Ok. Please open a jira on 2 follow-ups:
        1. Be able to serialize the whole plan and run it later.
        2. Provide a way to monitor the progress of a query (which consists of multiple tasks). 2 needs to be done before we can deploy this internally.

        I will test and commit.

        Show
        Zheng Shao added a comment - Ok. Please open a jira on 2 follow-ups: 1. Be able to serialize the whole plan and run it later. 2. Provide a way to monitor the progress of a query (which consists of multiple tasks). 2 needs to be done before we can deploy this internally. I will test and commit.
        Show
        Namit Jain added a comment - Some follow-up jiras: https://issues.apache.org/jira/browse/HIVE-569 https://issues.apache.org/jira/browse/HIVE-570
        Hide
        Zheng Shao added a comment -

        This is a conflict merging the patch. Can you regenerate the test case results?

        Show
        Zheng Shao added a comment - This is a conflict merging the patch. Can you regenerate the test case results?
        Hide
        Namit Jain added a comment -

        resolved conflicts

        Show
        Namit Jain added a comment - resolved conflicts
        Hide
        Zheng Shao added a comment -

        Testing. Will commit once the test succeeded.

        A side note:
        One more comment is the default value of hive.merge.size.per.mapper. In both HiveConf.java and conf/hive-default.xml, it's set to 10GB. That means most map-only jobs will run this optional task. Is that a bit too large?

        Given the usual block size of dfs 128MB, I think 100MB would be a good number. 100MB means that for identity select ("select * from"), we won't run this optional task. It also means that if we run the optional task, the output file size will be around 100MB which will most probably still be a single block.

        Most problems in our world is caused by very small files, say 1MB or less. So I guess 100MB is probably good enough.

        What do you think?

        Show
        Zheng Shao added a comment - Testing. Will commit once the test succeeded. A side note: One more comment is the default value of hive.merge.size.per.mapper. In both HiveConf.java and conf/hive-default.xml, it's set to 10GB. That means most map-only jobs will run this optional task. Is that a bit too large? Given the usual block size of dfs 128MB, I think 100MB would be a good number. 100MB means that for identity select ("select * from"), we won't run this optional task. It also means that if we run the optional task, the output file size will be around 100MB which will most probably still be a single block. Most problems in our world is caused by very small files, say 1MB or less. So I guess 100MB is probably good enough. What do you think?
        Hide
        Zheng Shao added a comment -

        Sorry the test failed because of recent commit of HIVE-338. Can you svn up and merge.

        Also the trunk is failing in some of the tests saying "java.lang.ClassNotFoundException: org.apache.hadoop.hive.serde2.TestSerDe". Needs some more investigation.

        Show
        Zheng Shao added a comment - Sorry the test failed because of recent commit of HIVE-338 . Can you svn up and merge. Also the trunk is failing in some of the tests saying "java.lang.ClassNotFoundException: org.apache.hadoop.hive.serde2.TestSerDe". Needs some more investigation.
        Hide
        Namit Jain added a comment -

        Had a discussion with Dhruba regarding the default file size also -

        1. In case of identity select, we do not have a map-reduce job, and therefore no merging is required.
        2. There is no harm in having bigger files as far as name node is concerned. The only problem is that it will result in lesser number of reducers, thereby increasing the time for merging. However, a 10GB file should result
        in ~20minutes on most installations, so should be OK.
        3. Even if the filter selects most of the rows: select * from T where ..,. If T is a big table, we dont want to create small versions of T for the selected table.
        So, a big size is better.

        I am generating the patch after resolving conflicts - will upload again

        Show
        Namit Jain added a comment - Had a discussion with Dhruba regarding the default file size also - 1. In case of identity select, we do not have a map-reduce job, and therefore no merging is required. 2. There is no harm in having bigger files as far as name node is concerned. The only problem is that it will result in lesser number of reducers, thereby increasing the time for merging. However, a 10GB file should result in ~20minutes on most installations, so should be OK. 3. Even if the filter selects most of the rows: select * from T where ..,. If T is a big table, we dont want to create small versions of T for the selected table. So, a big size is better. I am generating the patch after resolving conflicts - will upload again
        Hide
        Namit Jain added a comment -

        The tests seem to work for me - I will upload the patch again - but I haven't changed anything

        Show
        Namit Jain added a comment - The tests seem to work for me - I will upload the patch again - but I haven't changed anything
        Hide
        Namit Jain added a comment -

        resolved

        Show
        Namit Jain added a comment - resolved
        Hide
        Zheng Shao added a comment -

        Testing now. Will commit when test succeeds.

        Show
        Zheng Shao added a comment - Testing now. Will commit when test succeeds.
        Hide
        Zheng Shao added a comment -

        Committed. There is a small change: the default value for hive.merge.size.per.mapper is changed to 1000000000 (1GB) from 10GB.
        Thanks Namit!

        Show
        Zheng Shao added a comment - Committed. There is a small change: the default value for hive.merge.size.per.mapper is changed to 1000000000 (1GB) from 10GB. Thanks Namit!

          People

          • Assignee:
            Namit Jain
            Reporter:
            Namit Jain
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development