Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-309

General Redesign of Archived Timeline for efficient scan and management

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 1.0.0-beta1
    • Common Core
    • None

    Description

      As designed by Vinoth:

      Goals

      1. Archived Metadata should be scannable in the same way as data
      2. Provides more safety by always serving committed data independent of timeframe when the corresponding commit action was tried. Currently, we implicitly assume a data file to be valid if its commit time is older than the earliest time in the active timeline. While this works ok, any inherent bugs in rollback could inadvertently expose a possibly duplicate file when its commit timestamp becomes older than that of any commits in the timeline.
      3. We had to deal with lot of corner cases because of the way we treat a "commit" as special after it gets archived. Examples also include Savepoint handling logic by cleaner.
      4. Small Files : For Cloud stores, archiving simply moves fils from one directory to another causing the archive folder to grow. We need a way to efficiently compact these files and at the same time be friendly to scans

      Design:

       The basic file-group abstraction for managing file versions for data files can be extended to managing archived commit metadata. The idea is to use an optimal format (like HFile) for storing compacted version of <commitTime, Metadata> pairs. Every archiving run will read <commitTime, Metadata> pairs from active timeline and append to indexable log files. We will run periodic minor compactions to merge multiple log files to a compacted HFile storing metadata for a time-range. It should be also noted that we will partition by the action types (commit/clean).  This design would allow for the archived timeline to be queryable for determining whether a timeline is valid or not.

      Attachments

        1. Archived Timeline Notes by Vinoth 2.jpg
          92 kB
          Balaji Varadarajan
        2. Archive TImeline Notes by Vinoth 1.jpg
          102 kB
          Balaji Varadarajan

        Issue Links

          Activity

            Look at HUDI-167 for more details.

            vbalaji Balaji Varadarajan added a comment - Look at HUDI-167 for more details.
            vinoth Vinoth Chandar added a comment - https://issues.apache.org/jira/browse/HUDI-261 for more as well

            https://github.com/apache/incubator-hudi/blob/23b303e4b17c5f7b603900ee5b0d2e6718118014/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L860

                if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
                    logger.info("Cleaning up older rollback meta files");
                    // Cleanup of older cleaner meta files
                    // TODO - make the commit archival generic and archive rollback metadata         
                    FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),          
                                table.getActiveTimeline().getRollbackTimeline().getInstants());
                }
            

             

            As part of PR-942, the above code is removed as it is handled elsewhere. Just noting that we need to ensure cleaner commits are also handled correctly for archiving

            vbalaji Balaji Varadarajan added a comment - https://github.com/apache/incubator-hudi/blob/23b303e4b17c5f7b603900ee5b0d2e6718118014/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L860 if (!table.getActiveTimeline().getCleanerTimeline().empty()) { logger.info( "Cleaning up older rollback meta files" ); // Cleanup of older cleaner meta files // TODO - make the commit archival generic and archive rollback metadata FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), table.getActiveTimeline().getRollbackTimeline().getInstants()); }   As part of PR-942, the above code is removed as it is handled elsewhere. Just noting that we need to ensure cleaner commits are also handled correctly for archiving
            rxu Raymond Xu added a comment -

            vinoth

            Just trying to understand the new design..could you help clarify what's the advantage of partitioning by action types? Is it to avoid redundant scanning? Would that be better off if partitioning by datetime for even distribution?

            rxu Raymond Xu added a comment - vinoth Just trying to understand the new design..could you help clarify what's the advantage of partitioning by action types? Is it to avoid redundant scanning? Would that be better off if partitioning by datetime for even distribution?
            vinoth Vinoth Chandar added a comment -

            rxu We typically look up timeline by its action type (commits, deltacommits) like the code above. It just matches that. One of the main goals in this effort is to be able to "lookup" state of any instant/action and make decisions.. 

            A lot of things are still TBD. But this is the high level idea atm. 

            vinoth Vinoth Chandar added a comment - rxu  We typically look up timeline by its action type (commits, deltacommits) like the code above. It just matches that. One of the main goals in this effort is to be able to "lookup" state of any instant/action and make decisions..  A lot of things are still TBD. But this is the high level idea atm. 
            nicholasjiang Nicholas Jiang added a comment -

            Hi, vinoth & vbalaji & rxu, In my option, partitioning by commitTime could avoid generating many small files scenes. And the optimal format for storing compacted version of <partitionId, Metadata> pairs. Based on partitioning by commitTime, we could firstly route the partition when scaning the archived timeline, then find the metadata mapping to the partition.

            nicholasjiang Nicholas Jiang added a comment - Hi, vinoth & vbalaji & rxu , In my option, partitioning by commitTime could avoid generating many small files scenes. And the optimal format for storing compacted version of <partitionId, Metadata> pairs. Based on partitioning by commitTime, we could firstly route the partition when scaning the archived timeline, then find the metadata mapping to the partition.
            rxu Raymond Xu added a comment -

            nicholasjiang I think the design in "Description" is meant for treating <commitTime, metadata> pair as an event (like a message consumed from Kafka) and then append it to some metadata log files, which will then be compacted into HFile for future query purpose. I think for the same active timeline, commitTime is the natural identifier for commit metadata. (vinoth vbalaji please correct me if I misunderstand it)

            So far my concern for this design is in the action type partition: as the action types are fixed to a small number, the files under each type partition will keep growing. Eventually if too many files accumulated under a particular partition (say "commit/"), would that cause issue on scanning?

            How about, as also nicholasjiang points out, partitioning by "commitTime" (converted to "yyyy/MM/dd" or "yyyy/MM" or configurable) that would set certain upper bound to the number of files?

            rxu Raymond Xu added a comment - nicholasjiang  I think the design in "Description" is meant for treating <commitTime, metadata> pair as an event (like a message consumed from Kafka) and then append it to some metadata log files, which will then be compacted into HFile for future query purpose. I think for the same active timeline, commitTime is the natural identifier for commit metadata. ( vinoth vbalaji please correct me if I misunderstand it) So far my concern for this design is in the action type partition: as the action types are fixed to a small number, the files under each type partition will keep growing. Eventually if too many files accumulated under a particular partition (say "commit/"), would that cause issue on scanning? How about, as also nicholasjiang  points out, partitioning by "commitTime" (converted to "yyyy/MM/dd" or "yyyy/MM" or configurable) that would set certain upper bound to the number of files?
            vinoth Vinoth Chandar added a comment -

            > the same active timeline, commitTime is the natural identifier for commit metadata.
            commitTime + activeType + state would be identifier..

            >the files under each type partition will keep growing. Eventually if too many files accumulated under a particular partition (say "commit/"), would that cause issue on scanning?
            Since we would also compact these as you mention, it wont grow unbounded actually? 

             

            The rationale to partition by action type comes from how we access these in code. (active timeline will remain as-is), we typically obtain a specific timeline of actions : cleanTimeline, commitTimeline, deltaCommitTimeline etc.. We can sub partition by commitTime if it helps speed things up and seems like a scalable approach. We could incorporate both. Without the partitioingby action, we would have to scan the entire range of timeline for each action..

             

            All that said, what we have here is a strawman proposal. We will have to a RFC and flesh these out more.

            vinoth Vinoth Chandar added a comment - > the same active timeline, commitTime is the natural identifier for commit metadata. commitTime + activeType + state would be identifier.. >the files under each type partition will keep growing. Eventually if too many files accumulated under a particular partition (say "commit/"), would that cause issue on scanning? Since we would also compact these as you mention, it wont grow unbounded actually?    The rationale to partition by action type comes from how we access these in code. (active timeline will remain as-is), we typically obtain a specific timeline of actions : cleanTimeline, commitTimeline, deltaCommitTimeline etc.. We can sub partition by commitTime if it helps speed things up and seems like a scalable approach. We could incorporate both. Without the partitioingby action, we would have to scan the entire range of timeline for each action..   All that said, what we have here is a strawman proposal. We will have to a RFC and flesh these out more.
            nicholasjiang Nicholas Jiang added a comment -

            Thanks for vinothrxu explanation. At present I am more concerned about how this plan looks like, and whether I can participate in the development of subtasks. I have much interest in this module refeactor.

            nicholasjiang Nicholas Jiang added a comment - Thanks for vinoth rxu explanation. At present I am more concerned about how this plan looks like, and whether I can participate in the development of subtasks. I have much interest in this module refeactor.
            vinoth Vinoth Chandar added a comment -

            vbalaji is actually driving the plan (may be you should own the JIRA?).. Can you please respond to Nicholas. 

            vinoth Vinoth Chandar added a comment - vbalaji  is actually driving the plan (may be you should own the JIRA?).. Can you please respond to Nicholas. 
            nicholasjiang Nicholas Jiang added a comment -

            vinothThanks very much. And vbalaji, what's your plan about this refactor? I have interest in cooperation with you to complete the plan.

            nicholasjiang Nicholas Jiang added a comment - vinoth Thanks very much. And vbalaji , what's your plan about this refactor? I have interest in cooperation with you to complete the plan.
            vinoth Vinoth Chandar added a comment -

            vbalaji ^^ 

            vinoth Vinoth Chandar added a comment - vbalaji  ^^ 

            nicholasjiang rxu : Sorry for the late reply. Yes, we would need multi-level partitioning (actionType, commitTime) to make the storage structure efficient for timeline query use-cases. 

            I would be more than happy to collaborate with you in discussing further and coming up with final complete design. Would it work for you if we can revive this discussion in January ?  I was planning to work on this once I finish up the Bootstrap proposal for Hudi and back from vacation

             

             

            vbalaji Balaji Varadarajan added a comment - nicholasjiang rxu : Sorry for the late reply. Yes, we would need multi-level partitioning (actionType, commitTime) to make the storage structure efficient for timeline query use-cases.  I would be more than happy to collaborate with you in discussing further and coming up with final complete design. Would it work for you if we can revive this discussion in January ?  I was planning to work on this once I finish up the Bootstrap proposal for Hudi and back from vacation    
            nicholasjiang Nicholas Jiang added a comment -

            vbalajiYeah, I would like to collaborate with you for this redesign. I will wait for you to refactor this together. And We could discuss further in Hudi Slack Group. Could you please?

            nicholasjiang Nicholas Jiang added a comment - vbalaji Yeah, I would like to collaborate with you for this redesign. I will wait for you to refactor this together. And We could discuss further in Hudi Slack Group. Could you please?

            nicholasjiang : Absolutely. 

            vbalaji Balaji Varadarajan added a comment - nicholasjiang  : Absolutely. 
            rxu Raymond Xu added a comment -

            vbalaji Thanks for the clarification. Yes January would be great for me as well. (vacation too )

            rxu Raymond Xu added a comment - vbalaji  Thanks for the clarification. Yes January would be great for me as well. (vacation too  )
            rxu Raymond Xu added a comment -

            Made a design diagram for ease of discussion

            rxu Raymond Xu added a comment - Made a design diagram for ease of discussion
            vinoth Vinoth Chandar added a comment -

            Folks, I think we would agree this is a large effort with potential overall with RFC-15.. I was thinking about a way to make progress here.. on this specific problem and unblock other projects along the way.

            Specific problem : During write operations, cache the input using spark caching, compute a workload profile for purposes of file sizing etc. We also use persist this information in the inflight commit/deltacommit file, for doing rollbacks. i.e if the write fails midway leaving a .inflight commit/deltacommit, then upon the next write, we will read the workload profile written into the commit/deltacommit and then attempt delete left over files or log rollback blocks into log files to nullify the partial writes we might have written... Note that we will not read base or log files that are inflight in the active time by checking if the instant was inflight. but if we don't perform any rollback action and enough time passes, then this instant will be archived and that's where the trouble is. Once an instant goes into archived timeline today, there is no way to check it's individual state (inflight vs completed).. and this is what the JIRA was trying to handle in a generic way, so that the memory caching requirement is not used in this critical way functionally.

            Thinking back, I think we can shelve this JIRA as a longer term effect. and use an alternate approach to solve the specific problem above.. During each write (from Create and Merge handles, code is in HoodieTable.java) we already write out marker files under .hoodie that correspond 1-1 with 1 file being created or merged today. In case of partial write, this marker file might be left behind (need to ensure in code that we commit first and then delete markers) and we can directly use this to perform the rollback... (note that we need to handle backwards compatibility with existing timelines, support also downgrades to old ways)

            Let me know if this makes sense in a general way.. We can file a separate JIRA and get working on it..

            xleesf vbalaji vinoyang nagarwal

            vinoth Vinoth Chandar added a comment - Folks, I think we would agree this is a large effort with potential overall with RFC-15.. I was thinking about a way to make progress here.. on this specific problem and unblock other projects along the way. Specific problem : During write operations, cache the input using spark caching, compute a workload profile for purposes of file sizing etc. We also use persist this information in the inflight commit/deltacommit file, for doing rollbacks. i.e if the write fails midway leaving a .inflight commit/deltacommit, then upon the next write, we will read the workload profile written into the commit/deltacommit and then attempt delete left over files or log rollback blocks into log files to nullify the partial writes we might have written... Note that we will not read base or log files that are inflight in the active time by checking if the instant was inflight. but if we don't perform any rollback action and enough time passes, then this instant will be archived and that's where the trouble is. Once an instant goes into archived timeline today, there is no way to check it's individual state (inflight vs completed).. and this is what the JIRA was trying to handle in a generic way, so that the memory caching requirement is not used in this critical way functionally. Thinking back, I think we can shelve this JIRA as a longer term effect. and use an alternate approach to solve the specific problem above.. During each write (from Create and Merge handles, code is in HoodieTable.java) we already write out marker files under .hoodie that correspond 1-1 with 1 file being created or merged today. In case of partial write, this marker file might be left behind (need to ensure in code that we commit first and then delete markers) and we can directly use this to perform the rollback... (note that we need to handle backwards compatibility with existing timelines, support also downgrades to old ways) Let me know if this makes sense in a general way.. We can file a separate JIRA and get working on it.. xleesf vbalaji vinoyang nagarwal
            vinoth Vinoth Chandar added a comment -

            Closing the loop here.. https://issues.apache.org/jira/browse/HUDI-839 tracks this..

            Untagging this from the 0.6.0 release

            vinoth Vinoth Chandar added a comment - Closing the loop here.. https://issues.apache.org/jira/browse/HUDI-839 tracks this.. Untagging this from the 0.6.0 release
            danny0405 Danny Chen added a comment - Related PR: https://github.com/apache/hudi/pull/9209
            vinoth Vinoth Chandar added a comment -

            I think we can close this, given 9209 has landed?

            vinoth Vinoth Chandar added a comment - I think we can close this, given 9209 has landed?

            People

              danny0405 Danny Chen
              vbalaji Balaji Varadarajan
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: