Details

    • Type: Sub-task
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Hive doesn't have integration with Hadoop's OutputCommitter, it uses a NullOutputCommitter and uses its own commit logic spread across FileSinkOperator, MoveTask, and Hive.

      The Hadoop community is building an OutputCommitter that integrates with S3Guard and does a safe, coordinate commit of data on S3 inside individual tasks (HADOOP-13786). If Hive can integrate with this new OutputCommitter there would be a lot of benefits to Hive-on-S3:

      • Data is only written once; directly committing data at a task level means no renames are necessary
      • The commit is done safely, in a coordinated manner; duplicate tasks (from task retries or speculative execution) should not step on each other

        Issue Links

          Activity

          Hide
          fabbri Aaron Fabbri added a comment -

          Just FYI for watchers: the S3 Output Committer has been merged to trunk in Hadoop Common (HADOOP-13786).

          Show
          fabbri Aaron Fabbri added a comment - Just FYI for watchers: the S3 Output Committer has been merged to trunk in Hadoop Common ( HADOOP-13786 ).
          Hide
          stevel@apache.org Steve Loughran added a comment -

          Thanks for starting this

          1. We're making changes to FileOutputFormat so that it doesn't require an instance of FileOutputCommitter, just any committer which also supplies a working directory. This lets us add new committers alongside the existing one, without playing games trying to subclass what is already a complex game.
          1. All work is focused on getting the netflix "staging" Committer out the door first; the other one, which I'd started before netflix offered theres, does things inside S3a which could best be viewed as "dark magic". It will offer even more performance, but I'm neglecting it for now. The netflix one is in use in production, and has all its failure/abort algorithms thought out and implemented.
          1. I'm keeping the magic committer tests working, but not going to consider that one ready to use until it passes lots of tests. Consider it a speedup for the future.

          The netflix committer itself has two subclasess, "directory" and "partitioned", the directory one propagating a directory tree, the partitioned one expects paths like "dateint=20161116/hour=14"; it has a different conflict policy than the directory one.

          Algorithm for the staging committer is

          1. tasks write to a local temp dir
          2. task abort: delete the files
          3. task commit: PUT the files as multipart uploads to their final destinations, do not commit the put. Instead the data needed for the commit is saved to the cluster FS, and committed using the normal algorithm
          4. job commit: load in the output of all committed tasks, commit them. Failure to commit triggers revert: delete all files already committed, abort the rest of the list.
          5. job abort: abort the output of all uncommitted tasks by reading in the files and aborting those uploads.
          6. retry logic? Whatever we is implemented by the AWS SDK (mutliple attempts to POST/PUT parts) and in S3A (retries of that final commit POST)

          Nothing is visible until job commit; there's still a window of non-atomicity there, but its the time for N posts where N=#of files; this can be parallelised easily as it uses little bandwidth per post (unlike the uploads).

          In tests, the dir committer works for the intermediate output of MR jobs saving data to part-000x directories; the partitioned one good for spark output which doesn't save the intermediate data, and wants to output partitioned style.

          Show
          stevel@apache.org Steve Loughran added a comment - Thanks for starting this 1. We're making changes to FileOutputFormat so that it doesn't require an instance of FileOutputCommitter , just any committer which also supplies a working directory. This lets us add new committers alongside the existing one, without playing games trying to subclass what is already a complex game. 1. All work is focused on getting the netflix "staging" Committer out the door first; the other one, which I'd started before netflix offered theres, does things inside S3a which could best be viewed as "dark magic". It will offer even more performance, but I'm neglecting it for now. The netflix one is in use in production, and has all its failure/abort algorithms thought out and implemented. 1. I'm keeping the magic committer tests working, but not going to consider that one ready to use until it passes lots of tests. Consider it a speedup for the future. The netflix committer itself has two subclasess, "directory" and "partitioned", the directory one propagating a directory tree, the partitioned one expects paths like "dateint=20161116/hour=14"; it has a different conflict policy than the directory one. Algorithm for the staging committer is tasks write to a local temp dir task abort: delete the files task commit: PUT the files as multipart uploads to their final destinations, do not commit the put . Instead the data needed for the commit is saved to the cluster FS, and committed using the normal algorithm job commit: load in the output of all committed tasks, commit them. Failure to commit triggers revert: delete all files already committed, abort the rest of the list. job abort: abort the output of all uncommitted tasks by reading in the files and aborting those uploads. retry logic? Whatever we is implemented by the AWS SDK (mutliple attempts to POST/PUT parts) and in S3A (retries of that final commit POST) Nothing is visible until job commit; there's still a window of non-atomicity there, but its the time for N posts where N=#of files; this can be parallelised easily as it uses little bandwidth per post (unlike the uploads). In tests, the dir committer works for the intermediate output of MR jobs saving data to part-000x directories; the partitioned one good for spark output which doesn't save the intermediate data, and wants to output partitioned style.
          Hide
          stakiar Sahil Takiar added a comment -

          The patch is still a WIP, but since it using it will require significant changes in Hive, I though I would file a JIRA early. A detailed explanation of the work can be found here: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md

          My high level understanding is that there will be two different {{OutputCommitter}}s in this patch, and they both rely on the following idea:

          Our proposal for commiting work without rename is: delayed completion of multi-part PUT operations

          That is: tasks write all data as multipart uploads, but delay the final commit action until until the final, single job commit action. Only that data committed in the job commit action will be made visible; work from speculative and failed tasks will not be instiantiated. As there is no rename, there is no delay while data is copied from a temporary directory to the final directory. The duration of the commit will be the time needed to determine which commit operations to construct, and to execute them.

          The workers upload the data —but the job committer finalizes all uploads

          This is the key point of the algorithm. The data is uploaded, awaiting instantiation, but it doesn't appear in the object store until the final job commit operation completes the outstanding multipart uploads. At this point the new files become visible, which each objects instantiation being atomic.

          One of the committers is based on a committer from Netflix: https://github.com/rdblue/s3committer

          Show
          stakiar Sahil Takiar added a comment - The patch is still a WIP, but since it using it will require significant changes in Hive, I though I would file a JIRA early. A detailed explanation of the work can be found here: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md My high level understanding is that there will be two different {{OutputCommitter}}s in this patch, and they both rely on the following idea: Our proposal for commiting work without rename is: delayed completion of multi-part PUT operations That is: tasks write all data as multipart uploads, but delay the final commit action until until the final, single job commit action. Only that data committed in the job commit action will be made visible; work from speculative and failed tasks will not be instiantiated. As there is no rename, there is no delay while data is copied from a temporary directory to the final directory. The duration of the commit will be the time needed to determine which commit operations to construct, and to execute them. The workers upload the data —but the job committer finalizes all uploads This is the key point of the algorithm. The data is uploaded, awaiting instantiation, but it doesn't appear in the object store until the final job commit operation completes the outstanding multipart uploads. At this point the new files become visible, which each objects instantiation being atomic. One of the committers is based on a committer from Netflix: https://github.com/rdblue/s3committer
          Hide
          sershe Sergey Shelukhin added a comment -

          How does it handle cross-task commit for retries/failures etc.? I am not very familiar with it.

          Show
          sershe Sergey Shelukhin added a comment - How does it handle cross-task commit for retries/failures etc.? I am not very familiar with it.
          Hide
          stakiar Sahil Takiar added a comment -

          CC: Sergey Shelukhin - this may have some conflict / overlap with Micromanaged tables (HIVE-14535), so wanted to get your thoughts / opinion on this.

          Show
          stakiar Sahil Takiar added a comment - CC: Sergey Shelukhin - this may have some conflict / overlap with Micromanaged tables ( HIVE-14535 ), so wanted to get your thoughts / opinion on this.

            People

            • Assignee:
              stakiar Sahil Takiar
              Reporter:
              stakiar Sahil Takiar
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development