Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28945

Allow concurrent writes to different partitions with dynamic partition overwrite

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.4.3
    • None
    • SQL

    Description

      It is desirable to run concurrent jobs that write to different partitions within same baseDir using partitionBy and dynamic partitionOverwriteMode.

      See for example here:
      https://stackoverflow.com/questions/38964736/multiple-spark-jobs-appending-parquet-data-to-same-base-path-with-partitioning

      Or the discussion here:
      https://github.com/delta-io/delta/issues/9

      This doesnt seem that difficult. I suspect only changes needed are in org.apache.spark.internal.io.HadoopMapReduceCommitProtocol, which already has a flag for dynamicPartitionOverwrite. I got a quick test to work by disabling all committer activity (committer.setupJob, committer.commitJob, etc.) when dynamicPartitionOverwrite is true.

      Attachments

        Issue Links

          Activity

            koert koert kuipers added a comment - See also: https://mail-archives.apache.org/mod_mbox/spark-dev/201909.mbox/%3CCANx3uAinvf2LdtKfWUsykCJ%2BkHh6oYy0Pt_5LvcTSURGmQKQwg%40mail.gmail.com%3E
            cloud_fan Wenchen Fan added a comment -

            cc advancedxy do you want to work on it?

            cloud_fan Wenchen Fan added a comment - cc advancedxy do you want to work on it?
            advancedxy YE added a comment -

            Yeah, I can work on this. 

            advancedxy YE added a comment - Yeah, I can work on this. 
            stevel@apache.org Steve Loughran added a comment -

            It's a core part of the hadoop MR commit protocols. I think the best (only!) docs of these other than the most confusing piece of co-recursive code I've ever had to step through taking notes of is : https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_draft_005

            every MR app attempt has its own attempt ID; when the hadoop MR engine attempt N is restarted it looks for the temp dir of N-1 and can use this to recover from failure. Spark's solution to the app restart problem is "be faster and fix failures by restarting entirely", so the app attempt is always 0

            If you have two jobs writing to same destination path, their output is inevitably going to conflict and as the first job commit will delete the attempt dir then the second will fail.

            1. You need to (somehow) get a different attempt ID for each job to avoid that clash.
            2. jobs to set "mapreduce.fileoutputcommitter.cleanup.skipped" to false to avoid a full cleanup of _temporary on job commit. That's got a risk of leaking temp files after job failures.
            stevel@apache.org Steve Loughran added a comment - It's a core part of the hadoop MR commit protocols. I think the best (only!) docs of these other than the most confusing piece of co-recursive code I've ever had to step through taking notes of is : https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_draft_005 every MR app attempt has its own attempt ID; when the hadoop MR engine attempt N is restarted it looks for the temp dir of N-1 and can use this to recover from failure. Spark's solution to the app restart problem is "be faster and fix failures by restarting entirely", so the app attempt is always 0 If you have two jobs writing to same destination path, their output is inevitably going to conflict and as the first job commit will delete the attempt dir then the second will fail. You need to (somehow) get a different attempt ID for each job to avoid that clash. jobs to set "mapreduce.fileoutputcommitter.cleanup.skipped" to false to avoid a full cleanup of _temporary on job commit. That's got a risk of leaking temp files after job failures.
            hzfeiwang feiwang added a comment - - edited

            cloud_fan advancedxy
            Hi, I think the exception shown in the email(https://mail-archives.apache.org/mod_mbox/spark-dev/201909.mbox/%3CCANx3uAinvf2LdtKfWUsykCJ%2BkHh6oYy0Pt_5LvcTSURGmQKQwg%40mail.gmail.com%3E) is related with this PR(https://github.com/apache/spark/pull/25795).

            When dynamicPartitionOverwrite is true, we should skip commitJob.

            hzfeiwang feiwang added a comment - - edited cloud_fan advancedxy Hi, I think the exception shown in the email( https://mail-archives.apache.org/mod_mbox/spark-dev/201909.mbox/%3CCANx3uAinvf2LdtKfWUsykCJ%2BkHh6oYy0Pt_5LvcTSURGmQKQwg%40mail.gmail.com%3E ) is related with this PR( https://github.com/apache/spark/pull/25795 ). When dynamicPartitionOverwrite is true, we should skip commitJob.
            advancedxy YE added a comment -

            >  Hi, I think the exception shown in the email(https://mail-archives.apache.org/mod_mbox/spark-dev/201909.mbox/%3CCANx3uAinvf2LdtKfWUsykCJ%2BkHh6oYy0Pt_5LvcTSURGmQKQwg%40mail.gmail.com%3E) is related with this PR(https://github.com/apache/spark/pull/25795).

            > When dynamicPartitionOverwrite is true, we should skip commitJob.

             

            Well, it's not that simple. If you are going to skip commitJob for dynamicPartitionOverwrite, you should skip setupJob and abortJob for consistency.

            As we discussed offline, I believe the fix in my pr https://github.com/apache/spark/pull/25739 should cover the concurrent write for dynamicPartitionOverwrite. Adding output existence check and set specific output path for strict dynamic partition should also cover your cases too.

             

             

            advancedxy YE added a comment - >  Hi, I think the exception shown in the email( https://mail-archives.apache.org/mod_mbox/spark-dev/201909.mbox/%3CCANx3uAinvf2LdtKfWUsykCJ%2BkHh6oYy0Pt_5LvcTSURGmQKQwg%40mail.gmail.com%3E ) is related with this PR( https://github.com/apache/spark/pull/25795 ). > When dynamicPartitionOverwrite is true, we should skip commitJob.   Well, it's not that simple. If you are going to skip commitJob for dynamicPartitionOverwrite, you should skip setupJob and abortJob for consistency. As we discussed offline, I believe the fix in my pr  https://github.com/apache/spark/pull/25739  should cover the concurrent write for dynamicPartitionOverwrite. Adding output existence check and set specific output path for strict dynamic partition should also cover your cases too.    
            hzfeiwang feiwang added a comment - - edited

            advancedxy
            Thanks. Hope SPARK-28945 can be merged soon.
            It is important for data quality.

            hzfeiwang feiwang added a comment - - edited advancedxy Thanks. Hope SPARK-28945 can be merged soon. It is important for data quality.
            koert koert kuipers added a comment -

            i understand there is a great deal of complexity in the committer and this might require more work to get it right

            but its still unclear to me if the committer is doing anything at all in case of dynamic partition overwrite.
            what do i lose by disabling all committer activity (committer.setupJob, committer.commitJob, etc.) when dynamicPartitionOverwrite is true? and if i lose nothing, is that a good thing, or does that mean i should be worried about the current state?

            koert koert kuipers added a comment - i understand there is a great deal of complexity in the committer and this might require more work to get it right but its still unclear to me if the committer is doing anything at all in case of dynamic partition overwrite. what do i lose by disabling all committer activity (committer.setupJob, committer.commitJob, etc.) when dynamicPartitionOverwrite is true? and if i lose nothing, is that a good thing, or does that mean i should be worried about the current state?

            People

              Unassigned Unassigned
              koert koert kuipers
              Votes:
              5 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: